Provides Javascript wrappers for MQTT.
Dependencies: mbed-http DEVI2C_JS MQTTPacket FP
Diff: MQTT_JS.cpp
- Revision:
- 0:f4dbe435e64c
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTT_JS.cpp Wed Jan 17 11:30:51 2018 +0100
@@ -0,0 +1,352 @@
+/*
+ * @file MQTT_JS.cpp
+ * @author ST
+ * @version V1.0.0
+ * @date 9 October 2017
+ * @brief Implementation of MQTT for Javascript.
+ ******************************************************************************
+ * @attention
+ *
+ * <h2><center>© COPYRIGHT(c) 2017 STMicroelectronics</center></h2>
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of STMicroelectronics nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ ******************************************************************************
+ */
+
+
+/* Includes ------------------------------------------------------------------*/
+
+#include "MQTT_JS.h"
+
+/** onSubscribeCallback
+ * @brief onSubscribeCallback.
+ */
+jerry_value_t MQTT_JS::onSubscribeCallback;
+
+/** Constructor
+ * @brief Constructor.
+ */
+MQTT_JS::MQTT_JS(){
+ connack_rc = 0; // MQTT connack return code
+ ip_addr = NULL;
+ netConnecting = false;
+ connectTimeout = 1000;
+ mqttConnecting = false;
+ netConnected = false;
+ connected = false;
+ retryAttempt = 0;
+
+ client = NULL;
+ mqttNetwork = NULL;
+
+ onSubscribeCallback = NULL;
+
+}
+
+/** Destructor
+ * @brief Destructor.
+ */
+MQTT_JS::~MQTT_JS(){
+ if(client){
+ delete client;
+ client = NULL;
+ }
+ if(mqttNetwork){
+ delete mqttNetwork;
+ mqttNetwork = NULL;
+ }
+}
+
+/** subscribe_cb
+ * @brief Connects the subscription callback.
+ * @param Message Data
+ */
+void MQTT_JS::subscribe_cb(MQTT::MessageData & msgMQTT) {
+ char msg[MQTT_MAX_PAYLOAD_SIZE];
+ msg[0]='\0';
+ strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen);
+ //printf ("--->>> subscribe_cb msg: %s\n\r", msg);
+
+ if (onSubscribeCallback && jerry_value_is_function(onSubscribeCallback)) {
+
+ jerry_value_t this_val = jerry_create_undefined ();
+ const jerry_value_t args[1] = {
+ jerry_create_string ((const jerry_char_t *)msg)
+ //jerry_create_number(3)
+ };
+
+ jerry_value_t ret_val = jerry_call_function (onSubscribeCallback, this_val, args, 1);
+
+ if (!jerry_value_has_error_flag (ret_val))
+ {
+ // handle return value
+ }
+ jerry_release_value(args[0]);
+
+ jerry_release_value (ret_val);
+ jerry_release_value (this_val);
+
+ }
+}
+
+/** onSubscribe
+ * @brief Calls the subscription callback.
+ * @param Jerry Callback
+ * @return Return code
+ */
+int MQTT_JS::onSubscribe(jerry_value_t cb){
+
+ if (jerry_value_is_function(cb)) {
+ onSubscribeCallback = cb;
+ return 0;
+ }
+ return 1;
+}
+
+/** subscribe
+ * @brief Subscribes to the topic.
+ * @param Topic
+ * @return Return code
+ */
+int MQTT_JS::subscribe (char *_topic)
+{
+ strcpy(topic, _topic);
+ if(!topic){
+ return 1; // invalid topic
+ }
+ return client->subscribe(topic, MQTT::QOS1, subscribe_cb);
+}
+
+/** unsubscribe
+ * @brief Unsubscribes the callback
+ * @param Topic
+ * @return Return code
+ */
+int MQTT_JS::unsubscribe(char *pubTopic)
+{
+ return client->unsubscribe(pubTopic);
+}
+
+
+/** init
+ * @brief Initializes the MQTT.
+ * @param NetworkInterface
+ * @param ID
+ * @param Token
+ * @param URL
+ * @param Port
+ * @return Return code
+ */
+int MQTT_JS::init(NetworkInterface* network, char* _id, char* _token, char* _url, char* _port)
+{
+ sprintf (id, "%s", _id);
+ sprintf (auth_token, "%s", _token);
+ sprintf (hostname, "%s", _url);
+ sprintf (subscription_url, "%s", _url);
+ sprintf (port, "%s", _port);
+
+ if (!network) {
+ printf ("Error easy_connect\n\r");
+ return -1;
+ }
+
+ mqttNetwork = new MQTTNetwork(network);
+
+ client = new MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>(*mqttNetwork);
+
+ return 0;
+}
+
+/** connect
+ * @brief Connects to the MQTT Server.
+ * @param NetworkInterface
+ * @return Return code
+ */
+int MQTT_JS::connect(NetworkInterface* network)
+{
+ netConnecting = true;
+ int rc = mqttNetwork->connect(hostname, atoi(port));
+ if (rc != 0)
+ {
+ //WARN("IP Stack connect returned: %d\n", rc);
+ return rc;
+ }
+ printf ("--->TCP Connected\n\r");
+ netConnected = true;
+ netConnecting = false;
+
+ // MQTT Connect
+ mqttConnecting = true;
+ MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+ data.MQTTVersion = 4;
+ data.struct_version=0;
+ data.clientID.cstring = id;
+ data.username.cstring = id;
+ data.password.cstring = auth_token;
+ data.keepAliveInterval = 15; // in Sec
+ if ((rc = client->connect(data)) == 0)
+ {
+ connected = true;
+ printf ("--->MQTT Connected\n\r");
+ }
+ else {
+ WARN("MQTT connect returned %d\n", rc);
+ }
+ if (rc >= 0)
+ connack_rc = rc;
+ mqttConnecting = false;
+ return rc;
+}
+
+/** getConnTimeout
+ * @brief Returns the timeout in seconds.
+ * @param Attempt number
+ * @return Return code
+ */
+int MQTT_JS::getConnTimeout(int attemptNumber)
+{
+ // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute
+ // after 20 attempts, retry every 10 minutes
+ return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600;
+}
+
+/** attemptConnect
+ * @brief Attempt connection to MQTT server.
+ * @param NetworkInterface
+ */
+void MQTT_JS::attemptConnect(NetworkInterface* network)
+{
+ connected = false;
+
+ while (connect(network) != MQTT_CONNECTION_ACCEPTED)
+ {
+ if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) {
+ printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc);
+ return; // don't reattempt to connect if credentials are wrong
+ }
+ int timeout = getConnTimeout(++retryAttempt);
+ WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout);
+
+ // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed
+ // or maybe just add the proper members to do this disconnect and call attemptConnect(...)
+
+ // this works - reset the system when the retry count gets to a threshold
+ if (retryAttempt == 5)
+ NVIC_SystemReset();
+ else
+ wait(timeout);
+ }
+}
+
+/** publish
+ * @brief Publishes to the MQTT broker.
+ * @param Data
+ * @param Optional: retry number
+ * @return Return code
+ */
+int MQTT_JS::publish(char* buf, int n)
+{
+ MQTT::Message message;
+ message.qos = MQTT::QOS0;
+ message.retained = false;
+ message.dup = false;
+ message.payload = (void*)buf;
+ message.payloadlen = strlen(buf);
+
+ //LOG("Publishing %s\n\r", buf);
+ int result = client->publish(topic, message);
+ if(result != 0){
+ if(n < 2){
+ printf("\33[31mCould not publish message. Trying again...\33[0m\n");
+ return publish(buf, n+1);
+ }
+ else{
+ printf("\33[31mError publishing message!\33[0m\n");
+ return result;
+ }
+ }
+
+ /*
+ if(result == 0){
+ client->yield(5000); // allow the MQTT client to receive messages
+ }
+ */
+ return result;
+}
+
+
+/** yield
+ * @brief Waits for the MQTT broker for subscription callback.
+ * @param Time to wait
+ * @return Return code
+ */
+int MQTT_JS::yield(int time)
+{
+ client->yield(time); // allow the MQTT client to receive messages
+ return 0;
+}
+
+/** start_mqtt
+ * @brief Starts a demo for MQTT.
+ * @param NetworkInterface
+ * @return Return code
+ */
+int MQTT_JS::start_mqtt(NetworkInterface* network)
+{
+ sprintf (id, "hsojbpev");
+ sprintf (auth_token, "4H5vbg1KAhYi");
+ sprintf (hostname, "m20.cloudmqtt.com");
+ sprintf (subscription_url, "m20.cloudmqtt.com");
+ sprintf (port, "10023");
+
+ if (!network) {
+ printf ("Error easy_connect\n\r");
+ return -1;
+ }
+
+ mqttNetwork = new MQTTNetwork(network);
+
+ client = new MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>(*mqttNetwork);
+
+ attemptConnect(network);
+ if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD)
+ {
+ while (true)
+ wait(1.0); // Permanent failures - don't retry
+ }
+
+ int count = 0;
+ while (true)
+ {
+ if (++count == 6)
+ {
+ // Publish a message every ~3 second
+ if (publish((char*)"TestTest") != 0) {
+ attemptConnect(network); // if we have lost the connection
+ }
+ count = 0;
+ }
+ client->yield(500); // allow the MQTT client to receive messages
+ }
+}