Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: easy-connect-v16 Watchdog FP MQTTPacket RecordType-v-16 watersenor_and_temp_code
Revision 24:3de3cf978e60, committed 2017-12-29
- Comitter:
- DuyLionTran
- Date:
- Fri Dec 29 19:13:03 2017 +0000
- Parent:
- 23:65cd2b5c76d5
- Child:
- 25:cf7a3e31622a
- Commit message:
- version 1.8: publish 4 messages in sequence
Changed in this revision
| Simple-MQTT/MQTTNetwork.h | Show annotated file Show diff for this revision Revisions of this file |
| Simple-MQTT/SimpleMQTT.h | Show annotated file Show diff for this revision Revisions of this file |
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/Simple-MQTT/MQTTNetwork.h Fri Dec 29 19:13:03 2017 +0000
@@ -0,0 +1,40 @@
+#ifndef _MQTTNETWORK_H_
+#define _MQTTNETWORK_H_
+
+#include "NetworkInterface.h"
+
+class MQTTNetwork {
+public:
+ MQTTNetwork(NetworkInterface* aNetwork) : network(aNetwork) {
+ socket = new TCPSocket();
+ }
+
+ ~MQTTNetwork() {
+ delete socket;
+ }
+
+ int read(unsigned char* buffer, int len, int timeout) {
+ socket->set_timeout(timeout);
+ return socket->recv(buffer, len);
+ }
+
+ int write(unsigned char* buffer, int len, int timeout) {
+ socket->set_timeout(timeout);
+ return socket->send(buffer, len);
+ }
+
+ int connect(const char* hostname, int port) {
+ socket->open(network);
+ return socket->connect(hostname, port);
+ }
+
+ int disconnect() {
+ return socket->close();
+ }
+
+private:
+ NetworkInterface* network;
+ TCPSocket* socket;
+};
+
+#endif // _MQTTNETWORK_H_
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/Simple-MQTT/SimpleMQTT.h Fri Dec 29 19:13:03 2017 +0000
@@ -0,0 +1,337 @@
+#ifndef __SIMPLEMQTT_H__
+#define __SIMPLEMQTT_H__
+
+/***************************************************************
+ * Includes
+ ***************************************************************/
+#include "easy-connect.h"
+#include "MQTTClient.h"
+#include "NDefLib/NDefNfcTag.h"
+#include "NDefLib/RecordType/RecordURI.h"
+#include "MQTTNetwork.h"
+#include "MQTTmbed.h"
+
+/***************************************************************
+ * Definitions
+ ***************************************************************/
+ // Configuration values needed to connect to IBM IoT Cloud
+#define ORG MQTT_ORG_ID // connect to ORG.internetofthings.ibmcloud.com/ For a registered connection, replace with your org
+#define ID MQTT_DEVICE_ID // For a registered connection is your device id
+#define AUTH_TOKEN MQTT_DEVICE_PASSWORD // For a registered connection is a device auth-token
+#define DEFAULT_TYPE_NAME MQTT_DEVICE_TYPE // For a registered connection is device type
+#define AUTH_METHOD MQTT_USERNAME
+
+#define TYPE DEFAULT_TYPE_NAME // For a registered connection, replace with your type
+#define IBM_IOT_PORT MQTT_PORT
+
+#define MQTT_MAX_PACKET_SIZE 400
+#define MQTT_MAX_PAYLOAD_SIZE 300
+
+/***************************************************************
+ * Variables
+ ***************************************************************/
+typedef enum {
+ ADC_VALUE = 0,
+ SENSOR_VALUE,
+ RELAY_STATE,
+ CONFIG_VALUE
+} UploadType;
+
+struct UploadValue {
+ float ADC_PHVal;
+ float ADC_DOVal;
+
+ int RELAY_State_1;
+ int RELAY_State_2;
+
+ uint8_t CONFIG_Mode;
+ uint8_t CONFIG_MinOxi;
+ uint8_t CONFIG_MaxOxi;
+ uint16_t CONFIG_UploadInterval;
+} UploadValue;
+
+char *projectName = "WaterMonitor";
+static char id[30] = ID; // mac without colons
+static char org[12] = ORG;
+static char type[30] = TYPE;
+static char auth_token[30] = AUTH_TOKEN; // Auth_token is only used in non-quickstart mode
+static int connack_rc = 0; // MQTT connack return code
+static bool netConnecting = false;
+static bool mqttConnecting = false;
+static bool netConnected = false;
+static bool connected = false;
+static int retryAttempt = 0;
+static int connectTimeout = 1000;
+uint16_t commandID = 0;
+static char subscription_url[MQTT_MAX_PAYLOAD_SIZE];
+
+/***************************************************************
+ * Unity function definitions
+ ***************************************************************/
+/** brief Callback function when MQTT message arrives
+ * param[in] msgMQTT
+ * retral None
+ */
+void MQTT_SubscribeCallback(MQTT::MessageData & msgMQTT);
+
+/** brief Subscribe to a MQTT topic and set the MQTT callback function
+ * param[in] subscribeTopic Topic to be subscribed
+ * param[in] client MQTT client
+ * retral returnCode from MQTTClient.h
+ */
+int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client);
+
+/** brief Connect to the internet then the MQTT network
+ * param[in] client MQTT client
+ * param[in] mqttNetwork MQTT network
+ * param[in] network The internet network interface (ethernet, wifi...)
+ * retral Internet connect result and returnCode from MQTTClient.h
+ */
+int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network);
+
+/** brief Setup the number of attempt to re-connect to the internet
+ * param[in] attemptNumber The number of attemp
+ */
+int MQTT_GetConnTimeout(int attemptNumber);
+
+/** brief Try to reconnect to the internet and MQTT network
+ * retral None
+ */
+void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network);
+
+/** brief Publish ADC values to the server
+ * param[in] client MQTT client
+ * param[in] inputTime The time when the data is attempt to be sent
+ * param[in] adcVal_0 The ADC value to be sent
+ * retral returnCode from MQTTClient.h
+ */
+int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0);
+
+/** brief Publish relay states to the server
+ * param[in] client MQTT client
+ * param[in] inputTime The time when the data is attempt to be sent
+ * param[in] relay1 Relay 1 state
+ * param[in] relay2 Relay 2 state
+ * retral returnCode from MQTTClient.h
+ */
+int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2);
+
+/** brief Publish relay states to the server
+ * param[in] client MQTT client
+ * param[in] inputTime The time when the data is attempt to be sent
+ * param[in] mode current mode: automatic (0) or manual (1)
+ * param[in] maxOxi Maximum Oxygen value
+ * param[in] minOxi Minimum Oxygen value
+// * param[in] uploadInterval Interval between upload turns
+ * retral returnCode from MQTTClient.h
+ */
+int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi);
+
+/** brief Upload all the data to the MQTT server
+ * param[in] client MQTT client
+ * param[in] inputTime The time when the data is attempt to be sent
+ * param[in] uploadInterval The period between each upload moment
+ * retral returnCode from MQTTClient.h
+ */
+int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct);
+
+/********************************************************************************************************************************************************************************************/
+/***************************************************************
+ * Unity function declarations
+ ***************************************************************/
+void MQTT_SubscribeCallback(MQTT::MessageData & msgMQTT) {
+ char msg[MQTT_MAX_PAYLOAD_SIZE];
+ msg[0]='\0';
+ strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen);
+ printf ("--->>> MQTT_SubscribeCallback msg: %s\n\r", msg);
+}
+
+int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client) {
+ return client->subscribe(subscribeTopic, MQTT::QOS1, MQTT_SubscribeCallback);
+}
+
+int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network) {
+ const char* iot_ibm = MQTT_BROKER_URL;
+ char hostname[strlen(org) + strlen(iot_ibm) + 1];
+
+ sprintf(hostname, "%s%s", org, iot_ibm);
+ // Construct clientId - d:org:type:id
+ char clientId[strlen(org) + strlen(type) + strlen(id) + 5];
+ sprintf(clientId, "d:%s:%s:%s", org, type, id);
+ sprintf(subscription_url, "%s.%s/#/device/%s/%s/", org, "internetofthings.ibmcloud.com", id, DEFAULT_TYPE_NAME);
+
+ // Network debug statements
+ LOG("=====================================\n\r");
+ LOG("Nucleo IP ADDRESS: %s\n\r", network->get_ip_address());
+ LOG("Nucleo MAC ADDRESS: %s\n\r", network->get_mac_address());
+ LOG("Server Hostname: %s port: %d\n\r", hostname, IBM_IOT_PORT);
+ LOG("Client ID: %s\n\r", clientId);
+ LOG("Topic: %s\n\r",MQTT_EVENT_TOPIC);
+ LOG("Subscription URL: %s\n\r", subscription_url);
+ LOG("=====================================\n\r");
+ netConnecting = true;
+ int rc = mqttNetwork->connect(hostname, IBM_IOT_PORT);
+ if (rc != 0) {
+ printf("rc from TCP connect is %d\r\n", rc);
+ return rc;
+ }
+
+ printf ("--->TCP Connected\n\r");
+ netConnected = true;
+ netConnecting = false;
+
+ // MQTT Connect
+ mqttConnecting = true;
+ MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+ data.MQTTVersion = 4;
+ data.struct_version = 0;
+ data.clientID.cstring = clientId;
+ data.keepAliveInterval = MQTT_KEEPALIVE; // in Sec
+ data.username.cstring = AUTH_METHOD;
+ data.password.cstring = auth_token;
+ printf ("AutToken: %s\n\r", auth_token);
+
+ if ((rc = client->connect(data)) != 0) {
+ printf("rc from MQTT connect is %d\r\n", rc);
+ connack_rc = rc;
+ return rc;
+ }
+ connected = true;
+ printf ("--->MQTT Connected\n\r");
+ if ((rc = MQTT_Subscribe(MQTT_COMMAND_TOPIC, client)) == 0) {
+ LOG ("--->>>MQTT subscribed to: %s\n\r", MQTT_COMMAND_TOPIC);
+ } else {
+ LOG ("--->>>ERROR MQTT subscribe : %s\n\r", MQTT_COMMAND_TOPIC);
+ }
+ mqttConnecting = false;
+ connack_rc = rc;
+ return rc;
+}
+
+
+int MQTT_GetConnTimeout(int attemptNumber) { // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute
+ // after 20 attempts, retry every 10 minutes
+ return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600;
+}
+
+
+void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network) {
+ connected = false;
+
+ while (MQTT_Connect(client, mqttNetwork, network) != MQTT_CONNECTION_ACCEPTED) {
+ if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) {
+ printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc);
+ return; // don't reattempt to connect if credentials are wrong
+ }
+ int timeout = MQTT_GetConnTimeout(++retryAttempt);
+ WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout);
+
+ // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed
+ // or maybe just add the proper members to do this disconnect and call MQTT_AttemptConnect(...)
+ // this works - reset the system when the retry count gets to a threshold
+ if (retryAttempt == 5)
+ NVIC_SystemReset();
+ else
+ wait(timeout);
+ }
+}
+
+int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0) {
+ MQTT::Message message;
+ const char* pubTopic = MQTT_EVENT_TOPIC;
+
+ char buf[MQTT_MAX_PAYLOAD_SIZE];
+ char timeBuf[50];
+
+ if (!client->isConnected()) {
+ printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE;
+ }
+
+ strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
+// sprintf(buf,
+// "{\"Project\":\"%s\",\"Time\":\"%s\",\"Type\":1,\"cmdID\":%d,\"ADC0\":%0.2f}",
+// projectName, timeBuf, commandID, adcVal_0);
+ sprintf(buf, "{\"type\":1,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"adc0\":%0.2f}",
+ timeBuf, commandID, adcVal_0);
+ message.qos = MQTT::QOS0;
+ message.retained = false;
+ message.dup = false;
+ message.payload = (void*)buf;
+ message.payloadlen = strlen(buf);
+
+ if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
+ printf("message too long!\r\n");
+
+ LOG("Publishing %s\n\r", buf);
+ return client->publish(pubTopic, message);
+}
+
+int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2) {
+ MQTT::Message message;
+ const char* pubTopic = MQTT_EVENT_TOPIC;
+ char buf[MQTT_MAX_PAYLOAD_SIZE];
+ char timeBuf[50];
+
+ if (!client->isConnected()) {
+ printf ("---> MQTT DISCONNECTED\n\r");
+ return MQTT::FAILURE;
+ }
+ strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
+ sprintf(buf, "{\"type\":3,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"relay1\":%d,\"relay2\":%d}",
+ timeBuf, commandID, relay1, relay2);
+ message.qos = MQTT::QOS0;
+ message.retained = false;
+ message.dup = false;
+ message.payload = (void*)buf;
+ message.payloadlen = strlen(buf);
+
+ if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
+ printf("message too long!\r\n");
+
+ LOG("Publishing %s\n\r", buf);
+ return client->publish(pubTopic, message);
+}
+
+int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi) {
+ MQTT::Message message;
+ const char* pubTopic = MQTT_EVENT_TOPIC;
+ char buf[MQTT_MAX_PAYLOAD_SIZE];
+ char timeBuf[50];
+
+ if (!client->isConnected()) {
+ printf ("---> MQTT DISCONNECTED\n\r");
+ return MQTT::FAILURE;
+ }
+ strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
+ sprintf(buf, "{\"type\":4,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"mode\":%d,\"minOxygenVal\":%d,\"maxOxygenVal\":%d}",
+ timeBuf, commandID, mode, minOxi, maxOxi);
+ message.qos = MQTT::QOS0;
+ message.retained = false;
+ message.dup = false;
+ message.payload = (void*)buf;
+ message.payloadlen = strlen(buf);
+
+ if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
+ printf("message too long!\r\n");
+
+ LOG("Publishing %s\n\r", buf);
+ return client->publish(pubTopic, message);
+}
+
+int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct) {
+ int retVal;
+ switch (uploadType) {
+ case (ADC_VALUE): retVal = MQTT_PublishADC(client, inputTime, uploadStruct.ADC_PHVal);
+ break;
+ case (SENSOR_VALUE): retVal = MQTT::SUCCESS;
+ break;
+ case (RELAY_STATE): retVal = MQTT_PublishRelayState(client, inputTime, uploadStruct.RELAY_State_1, uploadStruct.RELAY_State_2);
+ break;
+ case (CONFIG_VALUE): retVal = MQTT_PublishConfigValue(client, inputTime, uploadStruct.CONFIG_Mode, uploadStruct.CONFIG_MinOxi, uploadStruct.CONFIG_MaxOxi);
+ break;
+ default: break;
+ }
+ return retVal;
+}
+
+#endif /* __SIMPLEMQTT_H__ */