iot_water_monitor_v2
Dependencies: easy-connect-v16 Watchdog FP MQTTPacket RecordType-v-16 watersenor_and_temp_code
Diff: Simple-MQTT/SimpleMQTT.h
- Revision:
- 32:8226837c56ae
- Parent:
- 31:0f7ea3981668
- Child:
- 33:5b90257d2d57
diff -r 0f7ea3981668 -r 8226837c56ae Simple-MQTT/SimpleMQTT.h --- a/Simple-MQTT/SimpleMQTT.h Mon Jan 08 22:09:56 2018 +0700 +++ b/Simple-MQTT/SimpleMQTT.h Mon Jan 08 18:07:56 2018 +0000 @@ -1,369 +1,443 @@ -#ifndef __SIMPLEMQTT_H__ -#define __SIMPLEMQTT_H__ - -/*************************************************************** - * Includes - ***************************************************************/ -#include "easy-connect.h" -#include "MQTTClient.h" -#include "NDefLib/NDefNfcTag.h" -#include "NDefLib/RecordType/RecordURI.h" -#include "MQTTNetwork.h" -#include "MQTTmbed.h" - -#include "Json.h" -#include "CommandExecution.h" -/*************************************************************** - * Definitions - ***************************************************************/ - // Configuration values needed to connect to IBM IoT Cloud -#define ORG MQTT_ORG_ID // connect to ORG.internetofthings.ibmcloud.com/ For a registered connection, replace with your org -#define ID MQTT_DEVICE_ID // For a registered connection is your device id -#define AUTH_TOKEN MQTT_DEVICE_PASSWORD // For a registered connection is a device auth-token -#define DEFAULT_TYPE_NAME MQTT_DEVICE_TYPE // For a registered connection is device type -#define AUTH_METHOD MQTT_USERNAME - -#define TYPE DEFAULT_TYPE_NAME // For a registered connection, replace with your type -#define IBM_IOT_PORT MQTT_PORT - -#define MQTT_MAX_PACKET_SIZE 400 -#define MQTT_MAX_PAYLOAD_SIZE 300 - -/*************************************************************** - * Variables - ***************************************************************/ -typedef enum { - ADC_VALUE = 0, - SENSOR_VALUE, - RELAY_STATE, - CONFIG_VALUE -} UploadType; - -struct UploadValue { - float ADC_PHVal; - float ADC_DOVal; - - int RELAY_State_1; - int RELAY_State_2; - - uint8_t CONFIG_Mode; - uint8_t CONFIG_MinOxi; - uint8_t CONFIG_MaxOxi; - uint16_t CONFIG_UploadInterval; -} UploadValue; - -char *projectName = "WaterMonitor"; -static char id[30] = ID; // mac without colons -static char org[12] = ORG; -static char type[30] = TYPE; -static char auth_token[30] = AUTH_TOKEN; // Auth_token is only used in non-quickstart mode -static int connack_rc = 0; // MQTT connack return code -static bool netConnecting = false; -static bool mqttConnecting = false; -static bool netConnected = false; -static bool connected = false; -static int retryAttempt = 0; -static int connectTimeout = 1000; -uint16_t commandID = 0; -static char subscription_url[MQTT_MAX_PAYLOAD_SIZE]; - -/*************************************************************** - * Unity function definitions - ***************************************************************/ -void MQTT_MessageHandles(uint8_t ControlSignal); - -/** brief Callback function when MQTT message arrives - * param[in] msgMQTT - * retral None - */ -void MQTT_SubscribeCallback(MQTT::MessageData & msgMQTT); - -/** brief Subscribe to a MQTT topic and set the MQTT callback function - * param[in] subscribeTopic Topic to be subscribed - * param[in] client MQTT client - * retral returnCode from MQTTClient.h - */ -int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client); - -/** brief Connect to the internet then the MQTT network - * param[in] client MQTT client - * param[in] mqttNetwork MQTT network - * param[in] network The internet network interface (ethernet, wifi...) - * retral Internet connect result and returnCode from MQTTClient.h - */ -int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network); - -/** brief Setup the number of attempt to re-connect to the internet - * param[in] attemptNumber The number of attemp - */ -int MQTT_GetConnTimeout(int attemptNumber); - -/** brief Try to reconnect to the internet and MQTT network - * retral None - */ -void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network); - -/** brief Publish ADC values to the server - * param[in] client MQTT client - * param[in] inputTime The time when the data is attempt to be sent - * param[in] adcVal_0 The ADC value to be sent - * retral returnCode from MQTTClient.h - */ -int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0); - -/** brief Publish relay states to the server - * param[in] client MQTT client - * param[in] inputTime The time when the data is attempt to be sent - * param[in] relay1 Relay 1 state - * param[in] relay2 Relay 2 state - * retral returnCode from MQTTClient.h - */ -int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2); - -/** brief Publish relay states to the server - * param[in] client MQTT client - * param[in] inputTime The time when the data is attempt to be sent - * param[in] mode current mode: automatic (0) or manual (1) - * param[in] maxOxi Maximum Oxygen value - * param[in] minOxi Minimum Oxygen value -// * param[in] uploadInterval Interval between upload turns - * retral returnCode from MQTTClient.h - */ -int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi); - -/** brief Upload all the data to the MQTT server - * param[in] client MQTT client - * param[in] inputTime The time when the data is attempt to be sent - * param[in] uploadInterval The period between each upload moment - * retral returnCode from MQTTClient.h - */ -int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct); - -/********************************************************************************************************************************************************************************************/ -/*************************************************************** - * Unity function declarations - ***************************************************************/ -void MQTT_SubscribeCallback(MQTT::MessageData & msgMQTT) { - // Message Handles - char msg[MQTT_MAX_PAYLOAD_SIZE]; - msg[0]='\0'; - strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen); - printf ("--->>> MQTT_SubscribeCallback msg: %s\n\r", msg); - //{"type":"3","deviceId":"string"} - Json json(msg, msgMQTT.message.payloadlen); - if (!json.isValidJson()) { - printf("Invalid JSON: %s", msg); - } - else { - if (json.type(0) != JSMN_OBJECT ) { - printf("Invalid JSON. ROOT element is not Object: %s", msg); - } - else { - int CommandType; - int KeyIndex = json.findKeyIndexIn("type", 0); - int KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); - int ret = json.tokenIntegerValue(KeyValueIndex, CommandType); - printf("Command Type: %d, error %d\r\n", CommandType, ret); - - switch (CommandType) { - case 3: - break; - - case 4: - break; - - default: break; - } - } - } -} - -int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client) { - return client->subscribe(subscribeTopic, MQTT::QOS1, MQTT_SubscribeCallback); -} - -int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network) { - const char* iot_ibm = MQTT_BROKER_URL; - char hostname[strlen(org) + strlen(iot_ibm) + 1]; - - sprintf(hostname, "%s%s", org, iot_ibm); - // Construct clientId - d:org:type:id - char clientId[strlen(org) + strlen(type) + strlen(id) + 5]; - sprintf(clientId, "d:%s:%s:%s", org, type, id); - sprintf(subscription_url, "%s.%s/#/device/%s/%s/", org, "internetofthings.ibmcloud.com", id, DEFAULT_TYPE_NAME); - - // Network debug statements - LOG("=====================================\n\r"); - LOG("Nucleo IP ADDRESS: %s\n\r", network->get_ip_address()); - LOG("Nucleo MAC ADDRESS: %s\n\r", network->get_mac_address()); - LOG("Server Hostname: %s port: %d\n\r", hostname, IBM_IOT_PORT); - LOG("Client ID: %s\n\r", clientId); - LOG("Topic: %s\n\r",MQTT_EVENT_TOPIC); - LOG("Subscription URL: %s\n\r", subscription_url); - LOG("=====================================\n\r"); - netConnecting = true; - int rc = mqttNetwork->connect(hostname, IBM_IOT_PORT); - if (rc != 0) { - printf("rc from TCP connect is %d\r\n", rc); - return rc; - } - - printf ("--->TCP Connected\n\r"); - netConnected = true; - netConnecting = false; - - // MQTT Connect - mqttConnecting = true; - MQTTPacket_connectData data = MQTTPacket_connectData_initializer; - data.MQTTVersion = 4; - data.struct_version = 0; - data.clientID.cstring = clientId; - data.keepAliveInterval = MQTT_KEEPALIVE; // in Sec - data.username.cstring = AUTH_METHOD; - data.password.cstring = auth_token; - printf ("AutToken: %s\n\r", auth_token); - - if ((rc = client->connect(data)) != 0) { - printf("rc from MQTT connect is %d\r\n", rc); - connack_rc = rc; - return rc; - } - connected = true; - printf ("--->MQTT Connected\n\r"); - if ((rc = MQTT_Subscribe(MQTT_COMMAND_TOPIC, client)) == 0) { - LOG ("--->>>MQTT subscribed to: %s\n\r", MQTT_COMMAND_TOPIC); - } else { - LOG ("--->>>ERROR MQTT subscribe : %s\n\r", MQTT_COMMAND_TOPIC); - } - mqttConnecting = false; - connack_rc = rc; - return rc; -} - - -int MQTT_GetConnTimeout(int attemptNumber) { // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute - // after 20 attempts, retry every 10 minutes - return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600; -} - - -void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network) { - connected = false; - - while (MQTT_Connect(client, mqttNetwork, network) != MQTT_CONNECTION_ACCEPTED) { - if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) { - printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc); - return; // don't reattempt to connect if credentials are wrong - } - int timeout = MQTT_GetConnTimeout(++retryAttempt); - WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout); - - // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed - // or maybe just add the proper members to do this disconnect and call MQTT_AttemptConnect(...) - // this works - reset the system when the retry count gets to a threshold - if (retryAttempt == 5) - NVIC_SystemReset(); - else - wait(timeout); - } -} - -int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0) { - MQTT::Message message; - const char* pubTopic = MQTT_EVENT_TOPIC; - - char buf[MQTT_MAX_PAYLOAD_SIZE]; - char timeBuf[50]; - - if (!client->isConnected()) { - printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; - } - - strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); -// sprintf(buf, -// "{\"Project\":\"%s\",\"Time\":\"%s\",\"Type\":1,\"cmdID\":%d,\"ADC0\":%0.2f}", -// projectName, timeBuf, commandID, adcVal_0); - sprintf(buf, "{\"type\":1,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"adc0\":%0.2f}", - timeBuf, commandID, adcVal_0); - message.qos = MQTT::QOS0; - message.retained = false; - message.dup = false; - message.payload = (void*)buf; - message.payloadlen = strlen(buf); - - if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) - printf("message too long!\r\n"); - - LOG("Publishing %s\n\r", buf); - return client->publish(pubTopic, message); -} - -int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2) { - MQTT::Message message; - const char* pubTopic = MQTT_EVENT_TOPIC; - char buf[MQTT_MAX_PAYLOAD_SIZE]; - char timeBuf[50]; - - if (!client->isConnected()) { - printf ("---> MQTT DISCONNECTED\n\r"); - return MQTT::FAILURE; - } - strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); - sprintf(buf, "{\"type\":3,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"relay1\":%d,\"relay2\":%d}", - timeBuf, commandID, relay1, relay2); - message.qos = MQTT::QOS0; - message.retained = false; - message.dup = false; - message.payload = (void*)buf; - message.payloadlen = strlen(buf); - - if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) - printf("message too long!\r\n"); - - LOG("Publishing %s\n\r", buf); - return client->publish(pubTopic, message); -} - -int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi) { - MQTT::Message message; - const char* pubTopic = MQTT_EVENT_TOPIC; - char buf[MQTT_MAX_PAYLOAD_SIZE]; - char timeBuf[50]; - - if (!client->isConnected()) { - printf ("---> MQTT DISCONNECTED\n\r"); - return MQTT::FAILURE; - } - strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); - sprintf(buf, "{\"type\":4,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"mode\":%d,\"minOxygenVal\":%d,\"maxOxygenVal\":%d}", - timeBuf, commandID, mode, minOxi, maxOxi); - message.qos = MQTT::QOS0; - message.retained = false; - message.dup = false; - message.payload = (void*)buf; - message.payloadlen = strlen(buf); - - if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) - printf("message too long!\r\n"); - - LOG("Publishing %s\n\r", buf); - return client->publish(pubTopic, message); -} - -int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct) { - int retVal; - switch (uploadType) { - case (ADC_VALUE): retVal = MQTT_PublishADC(client, inputTime, uploadStruct.ADC_PHVal); - break; - case (SENSOR_VALUE): retVal = MQTT::SUCCESS; - break; - case (RELAY_STATE): retVal = MQTT_PublishRelayState(client, inputTime, uploadStruct.RELAY_State_1, uploadStruct.RELAY_State_2); - break; - case (CONFIG_VALUE): retVal = MQTT_PublishConfigValue(client, inputTime, uploadStruct.CONFIG_Mode, uploadStruct.CONFIG_MinOxi, uploadStruct.CONFIG_MaxOxi); - break; - default: break; - } - return retVal; -} - -#endif /* __SIMPLEMQTT_H__ */ +#ifndef __SIMPLEMQTT_H__ +#define __SIMPLEMQTT_H__ + +/*************************************************************** + * Includes + ***************************************************************/ +#include "easy-connect.h" +#include "MQTTClient.h" +#include "NDefLib/NDefNfcTag.h" +#include "NDefLib/RecordType/RecordURI.h" +#include "MQTTNetwork.h" +#include "MQTTmbed.h" + +#include "Json.h" +#include "CommandExecution.h" +#include "flash_programming.h" +/*************************************************************** + * Definitions + ***************************************************************/ + // Configuration values needed to connect to IBM IoT Cloud +#define ORG MQTT_ORG_ID // connect to ORG.internetofthings.ibmcloud.com/ For a registered connection, replace with your org +#define ID MQTT_DEVICE_ID // For a registered connection is your device id +#define AUTH_TOKEN MQTT_DEVICE_PASSWORD // For a registered connection is a device auth-token +#define DEFAULT_TYPE_NAME MQTT_DEVICE_TYPE // For a registered connection is device type +#define AUTH_METHOD MQTT_USERNAME + +#define TYPE DEFAULT_TYPE_NAME // For a registered connection, replace with your type +#define IBM_IOT_PORT MQTT_PORT + +#define MQTT_MAX_PACKET_SIZE 400 +#define MQTT_MAX_PAYLOAD_SIZE 300 + +/*************************************************************** + * Variables + ***************************************************************/ +typedef enum { + ADC_VALUE = 0, + SENSOR_VALUE, + RELAY_STATE, + CONFIG_VALUE +} UploadType; + +struct UploadValue { + float ADC_PHVal; + float ADC_DOVal; + + float SENSOR_PHVal; + float SENSOR_D0Val; + + int RELAY_State_1; + int RELAY_State_2; + + uint8_t CONFIG_Mode; + uint8_t CONFIG_MinOxi; + uint8_t CONFIG_MaxOxi; + uint16_t CONFIG_UploadInterval; +} UploadValue; + +char *projectName = "WaterMonitor"; +static char id[30] = ID; // mac without colons +static char org[12] = ORG; +static char type[30] = TYPE; +static char auth_token[30] = AUTH_TOKEN; // Auth_token is only used in non-quickstart mode +static int connack_rc = 0; // MQTT connack return code +static bool netConnecting = false; +static bool mqttConnecting = false; +static bool netConnected = false; +static bool connected = false; +static int retryAttempt = 0; +static int connectTimeout = 1000; +uint16_t commandID = 0; +static char subscription_url[MQTT_MAX_PAYLOAD_SIZE]; + +extern struct UploadValue DataStruct; +/*************************************************************** + * Unity function definitions + ***************************************************************/ +void MQTT_MessageHandles(uint8_t ControlSignal); + +/** brief Callback function when MQTT message arrives + * param[in] msgMQTT + * retral None + */ +void MQTT_SubscribeCallback(MQTT::MessageData &msgMQTT); + +/** brief Subscribe to a MQTT topic and set the MQTT callback function + * param[in] subscribeTopic Topic to be subscribed + * param[in] client MQTT client + * retral returnCode from MQTTClient.h + */ +int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, struct UploadValue uploadStruct); + +/** brief Connect to the internet then the MQTT network + * param[in] client MQTT client + * param[in] mqttNetwork MQTT network + * param[in] network The internet network interface (ethernet, wifi...) + * retral Internet connect result and returnCode from MQTTClient.h + */ +int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct); + +/** brief Setup the number of attempt to re-connect to the internet + * param[in] attemptNumber The number of attemp + */ +int MQTT_GetConnTimeout(int attemptNumber); + +/** brief Try to reconnect to the internet and MQTT network + * retral None + */ +void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct); + +/** brief Publish ADC values to the server + * param[in] client MQTT client + * param[in] inputTime The time when the data is attempt to be sent + * param[in] adcVal_0 The ADC value to be sent + * retral returnCode from MQTTClient.h + */ +int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0); + +/** brief Publish Sensor values to the server + * param[in] client MQTT client + * param[in] inputTime The time when the data is attempt to be sent + * param[in] pHVal The pHVal value to be sent + * retral returnCode from MQTTClient.h + */ +int MQTT_PublishSensorVal(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float pHVal); + +/** brief Publish relay states to the server + * param[in] client MQTT client + * param[in] inputTime The time when the data is attempt to be sent + * param[in] relay1 Relay 1 state + * param[in] relay2 Relay 2 state + * retral returnCode from MQTTClient.h + */ +int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2); + +/** brief Publish relay states to the server + * param[in] client MQTT client + * param[in] inputTime The time when the data is attempt to be sent + * param[in] mode current mode: automatic (0) or manual (1) + * param[in] maxOxi Maximum Oxygen value + * param[in] minOxi Minimum Oxygen value +// * param[in] uploadInterval Interval between upload turns + * retral returnCode from MQTTClient.h + */ +int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi); + +/** brief Upload all the data to the MQTT server + * param[in] client MQTT client + * param[in] inputTime The time when the data is attempt to be sent + * param[in] uploadInterval The period between each upload moment + * retral returnCode from MQTTClient.h + */ +int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct); + +/********************************************************************************************************************************************************************************************/ +/*************************************************************** + * Unity function declarations + ***************************************************************/ +void MQTT_SubscribeCallback(MQTT::MessageData &msgMQTT) { + // Message Handles + char msg[MQTT_MAX_PAYLOAD_SIZE]; + msg[0]='\0'; + strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen); + printf ("--->>> MQTT_SubscribeCallback msg: %s\n\r", msg); + //{"type":"3","deviceId":"string"} + Json json(msg, msgMQTT.message.payloadlen); + if (!json.isValidJson()) { + printf("Invalid JSON: %s", msg); + } + else { + if (json.type(0) != JSMN_OBJECT ) { + printf("Invalid JSON. ROOT element is not Object: %s", msg); + } + else { + int CommandType; + int KeyIndex = json.findKeyIndexIn("type", 0); + int KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); + int ret = json.tokenIntegerValue(KeyValueIndex, CommandType); + + int receiveCmdID; + printf("Command Type: %d, error %d\r\n", CommandType, ret); + + switch (CommandType) { + case 3: int relayState0, relayState1; + KeyIndex = json.findKeyIndexIn("cmdID", 0); + KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); + ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID); + + KeyIndex = json.findKeyIndexIn("relayState0", 0); + KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); + ret = json.tokenIntegerValue(KeyValueIndex, relayState0); + + KeyIndex = json.findKeyIndexIn("relayState1", 0); + KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); + ret = json.tokenIntegerValue(KeyValueIndex, relayState1); + + DataStruct.RELAY_State_1 = relayState0; + DataStruct.RELAY_State_2 = relayState1; + CE_HandleRelays(relayState0, relayState1); + break; + + case 4: int mode, minOxiVal, maxOxiVal; + KeyIndex = json.findKeyIndexIn("cmdID", 0); + KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); + ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID); + + KeyIndex = json.findKeyIndexIn("mode", 0); + KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); + ret = json.tokenIntegerValue(KeyValueIndex, mode); + + KeyIndex = json.findKeyIndexIn("minOxygenVal", 0); + KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); + ret = json.tokenIntegerValue(KeyValueIndex, minOxiVal); + + KeyIndex = json.findKeyIndexIn("maxOxygenVal", 0); + KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); + ret = json.tokenIntegerValue(KeyValueIndex, maxOxiVal); + + DataStruct.CONFIG_Mode = mode; + DataStruct.CONFIG_MinOxi = minOxiVal; + DataStruct.CONFIG_MaxOxi = maxOxiVal; + //DataStruct.CONFIG_UploadInterval = relayState0; + break; + + default: break; + } + } + } +} + +int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, struct UploadValue uploadStruct) { + return client->subscribe(subscribeTopic, MQTT::QOS1, MQTT_SubscribeCallback); +} + +int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct) { + const char* iot_ibm = MQTT_BROKER_URL; + char hostname[strlen(org) + strlen(iot_ibm) + 1]; + + sprintf(hostname, "%s%s", org, iot_ibm); + // Construct clientId - d:org:type:id + char clientId[strlen(org) + strlen(type) + strlen(id) + 5]; + sprintf(clientId, "d:%s:%s:%s", org, type, id); + sprintf(subscription_url, "%s.%s/#/device/%s/%s/", org, "internetofthings.ibmcloud.com", id, DEFAULT_TYPE_NAME); + + // Network debug statements + LOG("=====================================\n\r"); + LOG("Nucleo IP ADDRESS: %s\n\r", network->get_ip_address()); + LOG("Nucleo MAC ADDRESS: %s\n\r", network->get_mac_address()); + LOG("Server Hostname: %s port: %d\n\r", hostname, IBM_IOT_PORT); + LOG("Client ID: %s\n\r", clientId); + LOG("Topic: %s\n\r",MQTT_EVENT_TOPIC); + LOG("Subscription URL: %s\n\r", subscription_url); + LOG("=====================================\n\r"); + netConnecting = true; + int rc = mqttNetwork->connect(hostname, IBM_IOT_PORT); + if (rc != 0) { + printf("rc from TCP connect is %d\r\n", rc); + return rc; + } + + printf ("--->TCP Connected\n\r"); + netConnected = true; + netConnecting = false; + + // MQTT Connect + mqttConnecting = true; + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + data.MQTTVersion = 4; + data.struct_version = 0; + data.clientID.cstring = clientId; + data.keepAliveInterval = MQTT_KEEPALIVE; // in Sec + data.username.cstring = AUTH_METHOD; + data.password.cstring = auth_token; + printf ("AutToken: %s\n\r", auth_token); + + if ((rc = client->connect(data)) != 0) { + printf("rc from MQTT connect is %d\r\n", rc); + connack_rc = rc; + return rc; + } + connected = true; + printf ("--->MQTT Connected\n\r"); + if ((rc = MQTT_Subscribe(MQTT_COMMAND_TOPIC, client, uploadStruct)) == 0) { + LOG ("--->>>MQTT subscribed to: %s\n\r", MQTT_COMMAND_TOPIC); + } else { + LOG ("--->>>ERROR MQTT subscribe : %s\n\r", MQTT_COMMAND_TOPIC); + } + mqttConnecting = false; + connack_rc = rc; + return rc; +} + + +int MQTT_GetConnTimeout(int attemptNumber) { // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute + // after 20 attempts, retry every 10 minutes + return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600; +} + + +void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct) { + connected = false; + + while (MQTT_Connect(client, mqttNetwork, network, uploadStruct) != MQTT_CONNECTION_ACCEPTED) { + if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) { + printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc); + return; // don't reattempt to connect if credentials are wrong + } + int timeout = MQTT_GetConnTimeout(++retryAttempt); + WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout); + + // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed + // or maybe just add the proper members to do this disconnect and call MQTT_AttemptConnect(...) + // this works - reset the system when the retry count gets to a threshold + if (retryAttempt == 5) + NVIC_SystemReset(); + else + wait(timeout); + } +} + +int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0) { + MQTT::Message message; + const char* pubTopic = MQTT_EVENT_TOPIC; + + char buf[MQTT_MAX_PAYLOAD_SIZE]; + char timeBuf[50]; + + if (!client->isConnected()) { + printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; + } + + strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); + sprintf(buf, "{\"type\":1,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"adc0\":%.1f}", + timeBuf, commandID, adcVal_0); + message.qos = MQTT::QOS0; + message.retained = false; + message.dup = false; + message.payload = (void*)buf; + message.payloadlen = strlen(buf); + + if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) + printf("message too long!\r\n"); + + LOG("Publishing %s\n\r", buf); + return client->publish(pubTopic, message); +} + +int MQTT_PublishSensorVal(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float pHVal) { + MQTT::Message message; + const char* pubTopic = MQTT_EVENT_TOPIC; + + char buf[MQTT_MAX_PAYLOAD_SIZE]; + char timeBuf[50]; + + if (!client->isConnected()) { + printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; + } + + strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); + sprintf(buf, "{\"type\":2,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"pH0\":%.1f}", + timeBuf, commandID, pHVal); + message.qos = MQTT::QOS0; + message.retained = false; + message.dup = false; + message.payload = (void*)buf; + message.payloadlen = strlen(buf); + + if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) + printf("message too long!\r\n"); + + LOG("Publishing %s\n\r", buf); + return client->publish(pubTopic, message); +} + +int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2) { + MQTT::Message message; + const char* pubTopic = MQTT_EVENT_TOPIC; + char buf[MQTT_MAX_PAYLOAD_SIZE]; + char timeBuf[50]; + + if (!client->isConnected()) { + printf ("---> MQTT DISCONNECTED\n\r"); + return MQTT::FAILURE; + } + strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); + sprintf(buf, "{\"type\":3,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"relay1\":%d,\"relay2\":%d}", + timeBuf, commandID, relay1, relay2); + message.qos = MQTT::QOS0; + message.retained = false; + message.dup = false; + message.payload = (void*)buf; + message.payloadlen = strlen(buf); + + if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) + printf("message too long!\r\n"); + + LOG("Publishing %s\n\r", buf); + return client->publish(pubTopic, message); +} + +int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi) { + MQTT::Message message; + const char* pubTopic = MQTT_EVENT_TOPIC; + char buf[MQTT_MAX_PAYLOAD_SIZE]; + char timeBuf[50]; + + if (!client->isConnected()) { + printf ("---> MQTT DISCONNECTED\n\r"); + return MQTT::FAILURE; + } + strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); + sprintf(buf, "{\"type\":4,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"mode\":%d,\"minOxygenVal\":%d,\"maxOxygenVal\":%d}", + timeBuf, commandID, mode, minOxi, maxOxi); + message.qos = MQTT::QOS0; + message.retained = false; + message.dup = false; + message.payload = (void*)buf; + message.payloadlen = strlen(buf); + + if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) + printf("message too long!\r\n"); + + LOG("Publishing %s\n\r", buf); + return client->publish(pubTopic, message); +} + +int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct) { + int retVal; + switch (uploadType) { + case (ADC_VALUE): retVal = MQTT_PublishADC(client, inputTime, uploadStruct.ADC_PHVal); + break; + case (SENSOR_VALUE): retVal = MQTT_PublishSensorVal(client, inputTime, uploadStruct.SENSOR_PHVal); + break; + case (RELAY_STATE): retVal = MQTT_PublishRelayState(client, inputTime, uploadStruct.RELAY_State_1, uploadStruct.RELAY_State_2); + break; + case (CONFIG_VALUE): retVal = MQTT_PublishConfigValue(client, inputTime, uploadStruct.CONFIG_Mode, uploadStruct.CONFIG_MinOxi, uploadStruct.CONFIG_MaxOxi); + break; + default: break; + } + return retVal; +} + +#endif /* __SIMPLEMQTT_H__ */