Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: easy-connect-v16 Watchdog FP MQTTPacket RecordType-v-16 watersenor_and_temp_code
Simple-MQTT/SimpleMQTT.h
- Committer:
- DuyLionTran
- Date:
- 2018-02-27
- Revision:
- 40:4356c209c58d
- Parent:
- 39:a5ee98bd0050
- Child:
- 43:dcde0e66874a
File content as of revision 40:4356c209c58d:
#ifndef __SIMPLEMQTT_H__
#define __SIMPLEMQTT_H__
/***************************************************************
* Includes
***************************************************************/
#include "easy-connect.h"
#include "MQTTClient.h"
#include "NDefLib/NDefNfcTag.h"
#include "NDefLib/RecordType/RecordURI.h"
#include "MQTTNetwork.h"
#include "MQTTmbed.h"
#include "Json.h"
#include "CommandExecution.h"
#include "flash_programming.h"
/***************************************************************
* Definitions
***************************************************************/
// Configuration values needed to connect to IBM IoT Cloud
#define ORG MQTT_ORG_ID // connect to ORG.internetofthings.ibmcloud.com/ For a registered connection, replace with your org
#define ID MQTT_DEVICE_ID // For a registered connection is your device id
#define AUTH_TOKEN MQTT_DEVICE_PASSWORD // For a registered connection is a device auth-token
#define DEFAULT_TYPE_NAME MQTT_DEVICE_TYPE // For a registered connection is device type
#define AUTH_METHOD MQTT_USERNAME
#define TYPE DEFAULT_TYPE_NAME // For a registered connection, replace with your type
#define IBM_IOT_PORT MQTT_PORT
#define MQTT_MAX_PACKET_SIZE 400
#define MQTT_MAX_PAYLOAD_SIZE 300
/***************************************************************
* Variables
***************************************************************/
typedef enum {
ADC_VALUE = 0,
SENSOR_VALUE,
RELAY_STATE,
CONFIG_VALUE
} UploadType;
typedef enum {
CONTROL_CMD = 0,
READ_CMD,
SETUP_CMD
} CommandType;
struct UploadValue {
float ADC_PHVal;
float ADC_DOVal;
float SENSOR_PHVal;
float SENSOR_DOVal;
uint8_t RELAY_State_1;
uint8_t RELAY_State_2;
uint8_t RELAY_State_3;
uint32_t CONFIG_Time;
uint8_t CONFIG_Mode;
uint8_t CONFIG_MinOxi;
uint8_t CONFIG_MaxOxi;
uint16_t CONFIG_UploadInterval;
} UploadValue;
char *projectName = "WaterMonitor";
static char id[30] = ID; // mac without colons
static char org[12] = ORG;
static char type[30] = TYPE;
static char auth_token[30] = AUTH_TOKEN; // Auth_token is only used in non-quickstart mode
static int connack_rc = 0; // MQTT connack return code
static bool wifiConnected = true;
static bool netConnecting = false;
static bool mqttConnecting = false;
static bool netConnected = false;
static bool connected = false;
static int retryAttempt = 0;
static int connectTimeout = 1000;
uint16_t commandID = 0;
static char subscription_url[MQTT_MAX_PAYLOAD_SIZE];
extern struct UploadValue DataStruct;
/***************************************************************
* Unity function definitions
***************************************************************/
void MQTT_MessageHandles(uint8_t ControlSignal);
/** brief Callback function when MQTT message arrives
* param[in] msgMQTT
* retral None
*/
void MQTT_SubscribeCallback(MQTT::MessageData &msgMQTT);
/** brief Subscribe to a MQTT topic and set the MQTT callback function
* param[in] subscribeTopic Topic to be subscribed
* param[in] client MQTT client
* retral returnCode from MQTTClient.h
*/
int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, struct UploadValue uploadStruct);
/** brief Connect to the internet then the MQTT network
* param[in] client MQTT client
* param[in] mqttNetwork MQTT network
* param[in] network The internet network interface (ethernet, wifi...)
* retral Internet connect result and returnCode from MQTTClient.h
*/
int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct);
/** brief Setup the number of attempt to re-connect to the internet
* param[in] attemptNumber The number of attemp
*/
int MQTT_GetConnTimeout(int attemptNumber);
/** brief Try to reconnect to the internet and MQTT network
* retral None
*/
void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct);
/** brief Publish ADC values to the server
* param[in] client MQTT client
* param[in] inputTime The time when the data is attempt to be sent
* param[in] adcVal_0 The ADC value to be sent
* retral returnCode from MQTTClient.h
*/
int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0);
/** brief Publish Sensor values to the server
* param[in] client MQTT client
* param[in] inputTime The time when the data is attempt to be sent
* param[in] pHVal The pHVal value to be sent
* retral returnCode from MQTTClient.h
*/
int MQTT_PublishSensorVal(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float DOVal);
/** brief Publish relay states to the server
* param[in] client MQTT client
* param[in] inputTime The time when the data is attempt to be sent
* param[in] relay1 Relay 1 state
* param[in] relay2 Relay 2 state
* retral returnCode from MQTTClient.h
*/
int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2, int relay3);
/** brief Publish relay states to the server
* param[in] client MQTT client
* param[in] inputTime The time when the data is attempt to be sent
* param[in] mode current mode: automatic (0) or manual (1)
* param[in] maxOxi Maximum Oxygen value
* param[in] minOxi Minimum Oxygen value
// * param[in] uploadInterval Interval between upload turns
* retral returnCode from MQTTClient.h
*/
int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi);
/** brief Upload all the data to the MQTT server
* param[in] client MQTT client
* param[in] inputTime The time when the data is attempt to be sent
* param[in] uploadInterval The period between each upload moment
* retral returnCode from MQTTClient.h
*/
int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct);
/********************************************************************************************************************************************************************************************/
/***************************************************************
* Unity function declarations
***************************************************************/
void MQTT_SubscribeCallback(MQTT::MessageData &msgMQTT) {
// Message Handles
char msg[MQTT_MAX_PAYLOAD_SIZE];
msg[0]='\0';
strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen);
printf ("--->>> MQTT_SubscribeCallback msg: %s\n\r", msg);
/* {"type":"3","deviceId":"string"} */
Json json(msg, msgMQTT.message.payloadlen);
if (!json.isValidJson()) {
printf("Invalid JSON: %s", msg);
}
else {
if (json.type(0) != JSMN_OBJECT ) {
printf("Invalid JSON. ROOT element is not Object: %s", msg);
}
else {
int CommandType;
int KeyIndex = json.findKeyIndexIn("type", 0);
int KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
int ret = json.tokenIntegerValue(KeyValueIndex, CommandType);
int receiveCmdID;
printf("Command Type: %d, error %d\r\n", CommandType, ret);
switch (CommandType) {
case CONTROL_CMD: CE_Calibrate();
break;
case 3: int relayState1, relayState2, relayState3;
KeyIndex = json.findKeyIndexIn("cmdID", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID);
KeyIndex = json.findKeyIndexIn("relayState1", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, relayState1);
KeyIndex = json.findKeyIndexIn("relayState2", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, relayState2);
KeyIndex = json.findKeyIndexIn("relayState3", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, relayState3);
DataStruct.RELAY_State_1 = relayState1;
DataStruct.RELAY_State_2 = relayState2;
DataStruct.RELAY_State_3 = relayState3;
FP_WriteRelayStates(DataStruct.RELAY_State_1, DataStruct.RELAY_State_2, DataStruct.RELAY_State_3);
CE_HandleRelays(relayState1, relayState2, relayState3);
break;
case 4: int mode, minOxiVal, maxOxiVal, uploadInterval, setRTCTime;
KeyIndex = json.findKeyIndexIn("cmdID", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID);
KeyIndex = json.findKeyIndexIn("mode", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, mode);
KeyIndex = json.findKeyIndexIn("minOxygenVal", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, minOxiVal);
KeyIndex = json.findKeyIndexIn("maxOxygenVal", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, maxOxiVal);
KeyIndex = json.findKeyIndexIn("uploadInterval", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, uploadInterval);
KeyIndex = json.findKeyIndexIn("setRTCTime", 0);
KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
ret = json.tokenIntegerValue(KeyValueIndex, setRTCTime);
DataStruct.CONFIG_Mode = mode;
DataStruct.CONFIG_MinOxi = minOxiVal;
DataStruct.CONFIG_MaxOxi = maxOxiVal;
DataStruct.CONFIG_UploadInterval = uploadInterval;
FP_WriteConfigValues(DataStruct.CONFIG_Mode, DataStruct.CONFIG_MinOxi, DataStruct.CONFIG_MaxOxi, DataStruct.CONFIG_UploadInterval);
CE_SetRTCTime(setRTCTime);
break;
default: break;
}
}
}
}
int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, struct UploadValue uploadStruct) {
return client->subscribe(subscribeTopic, MQTT::QOS1, MQTT_SubscribeCallback);
}
int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct) {
const char* iot_ibm = MQTT_BROKER_URL;
char hostname[strlen(org) + strlen(iot_ibm) + 1];
sprintf(hostname, "%s%s", org, iot_ibm);
// Construct clientId - d:org:type:id
char clientId[strlen(org) + strlen(type) + strlen(id) + 5];
sprintf(clientId, "d:%s:%s:%s", org, type, id);
sprintf(subscription_url, "%s.%s/#/device/%s/%s/", org, "internetofthings.ibmcloud.com", id, DEFAULT_TYPE_NAME);
// Network debug statements
LOG("=====================================\n\r");
LOG("Nucleo IP ADDRESS: %s\n\r", network->get_ip_address());
LOG("Nucleo MAC ADDRESS: %s\n\r", network->get_mac_address());
LOG("Server Hostname: %s port: %d\n\r", hostname, IBM_IOT_PORT);
LOG("Client ID: %s\n\r", clientId);
LOG("Topic: %s\n\r",MQTT_EVENT_TOPIC);
LOG("Subscription URL: %s\n\r", subscription_url);
LOG("=====================================\n\r");
netConnecting = true;
int rc = mqttNetwork->connect(hostname, IBM_IOT_PORT);
if (rc != 0) {
printf("rc from TCP connect is %d\r\n", rc);
return rc;
}
printf ("--->TCP Connected\n\r");
netConnected = true;
netConnecting = false;
// MQTT Connect
mqttConnecting = true;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.MQTTVersion = 4;
data.struct_version = 0;
data.clientID.cstring = clientId;
data.keepAliveInterval = MQTT_KEEPALIVE; // in Sec
data.username.cstring = AUTH_METHOD;
data.password.cstring = auth_token;
printf ("AutToken: %s\n\r", auth_token);
if ((rc = client->connect(data)) != 0) {
printf("rc from MQTT connect is %d\r\n", rc);
connack_rc = rc;
return rc;
}
connected = true;
printf ("--->MQTT Connected\n\r");
if ((rc = MQTT_Subscribe(MQTT_COMMAND_TOPIC, client, uploadStruct)) == 0) {
LOG ("--->>>MQTT subscribed to: %s\n\r", MQTT_COMMAND_TOPIC);
} else {
LOG ("--->>>ERROR MQTT subscribe : %s\n\r", MQTT_COMMAND_TOPIC);
}
mqttConnecting = false;
connack_rc = rc;
return rc;
}
int MQTT_GetConnTimeout(int attemptNumber) { // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute
// after 20 attempts, retry every 10 minutes
return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600;
}
void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct) {
connected = false;
while (MQTT_Connect(client, mqttNetwork, network, uploadStruct) != MQTT_CONNECTION_ACCEPTED) {
if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) {
printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc);
return; // don't reattempt to connect if credentials are wrong
}
int timeout = MQTT_GetConnTimeout(++retryAttempt);
WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout);
// if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed
// or maybe just add the proper members to do this disconnect and call MQTT_AttemptConnect(...)
// this works - reset the system when the retry count gets to a threshold
if (retryAttempt == 5)
NVIC_SystemReset();
else
wait(timeout);
}
}
int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0) {
MQTT::Message message;
const char* pubTopic = MQTT_EVENT_TOPIC;
char buf[MQTT_MAX_PAYLOAD_SIZE];
char timeBuf[50];
if (!client->isConnected()) {
printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE;
}
strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
sprintf(buf, "{\"type\":1,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"adc0\":%.1f}",
timeBuf, commandID, adcVal_0);
message.qos = MQTT::QOS0;
message.retained = false;
message.dup = false;
message.payload = (void*)buf;
message.payloadlen = strlen(buf);
if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
printf("message too long!\r\n");
LOG("Publishing %s\n\r", buf);
return client->publish(pubTopic, message);
}
int MQTT_PublishSensorVal(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float DOVal) {
MQTT::Message message;
const char* pubTopic = MQTT_EVENT_TOPIC;
char buf[MQTT_MAX_PAYLOAD_SIZE];
char timeBuf[50];
if (!client->isConnected()) {
printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE;
}
strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
sprintf(buf, "{\"type\":2,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"DO(ppm)\":%.2f}",
timeBuf, commandID, DOVal);
message.qos = MQTT::QOS0;
message.retained = false;
message.dup = false;
message.payload = (void*)buf;
message.payloadlen = strlen(buf);
if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
printf("message too long!\r\n");
LOG("Publishing %s\n\r", buf);
return client->publish(pubTopic, message);
}
int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2, int relay3) {
MQTT::Message message;
const char* pubTopic = MQTT_EVENT_TOPIC;
char buf[MQTT_MAX_PAYLOAD_SIZE];
char timeBuf[50];
if (!client->isConnected()) {
printf ("---> MQTT DISCONNECTED\n\r");
return MQTT::FAILURE;
}
strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
sprintf(buf, "{\"type\":3,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"relay1\":%d,\"relay2\":%d,\"relay3\":%d}",
timeBuf, commandID, relay1, relay2, relay3);
message.qos = MQTT::QOS0;
message.retained = false;
message.dup = false;
message.payload = (void*)buf;
message.payloadlen = strlen(buf);
if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
printf("message too long!\r\n");
LOG("Publishing %s\n\r", buf);
return client->publish(pubTopic, message);
}
int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi) {
MQTT::Message message;
const char* pubTopic = MQTT_EVENT_TOPIC;
char buf[MQTT_MAX_PAYLOAD_SIZE];
char timeBuf[50];
if (!client->isConnected()) {
printf ("---> MQTT DISCONNECTED\n\r");
return MQTT::FAILURE;
}
strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
sprintf(buf, "{\"type\":4,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"mode\":%d,\"minOxygenVal\":%d,\"maxOxygenVal\":%d}",
timeBuf, commandID, mode, minOxi, maxOxi);
message.qos = MQTT::QOS0;
message.retained = false;
message.dup = false;
message.payload = (void*)buf;
message.payloadlen = strlen(buf);
if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
printf("message too long!\r\n");
LOG("Publishing %s\n\r", buf);
return client->publish(pubTopic, message);
}
int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct) {
int retVal;
switch (uploadType) {
case (ADC_VALUE): retVal = MQTT_PublishADC(client, inputTime, uploadStruct.ADC_DOVal);
break;
case (SENSOR_VALUE): retVal = MQTT_PublishSensorVal(client, inputTime, uploadStruct.SENSOR_DOVal);
break;
case (RELAY_STATE): retVal = MQTT_PublishRelayState(client, inputTime, uploadStruct.RELAY_State_1, uploadStruct.RELAY_State_2, uploadStruct.RELAY_State_3);
break;
case (CONFIG_VALUE): retVal = MQTT_PublishConfigValue(client, inputTime, uploadStruct.CONFIG_Mode, uploadStruct.CONFIG_MinOxi, uploadStruct.CONFIG_MaxOxi);
break;
default: break;
}
return retVal;
}
#endif /* __SIMPLEMQTT_H__ */