Provides Javascript wrappers for MQTT.

Dependencies:   mbed-http DEVI2C_JS MQTTPacket FP

Revision:
0:f4dbe435e64c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTT_JS.cpp	Wed Jan 17 11:30:51 2018 +0100
@@ -0,0 +1,352 @@
+/*
+ * @file    MQTT_JS.cpp
+ * @author  ST
+ * @version V1.0.0
+ * @date    9 October 2017
+ * @brief   Implementation of MQTT for Javascript.
+ ******************************************************************************
+ * @attention
+ *
+ * <h2><center>&copy; COPYRIGHT(c) 2017 STMicroelectronics</center></h2>
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *   1. Redistributions of source code must retain the above copyright notice,
+ *      this list of conditions and the following disclaimer.
+ *   2. Redistributions in binary form must reproduce the above copyright notice,
+ *      this list of conditions and the following disclaimer in the documentation
+ *      and/or other materials provided with the distribution.
+ *   3. Neither the name of STMicroelectronics nor the names of its contributors
+ *      may be used to endorse or promote products derived from this software
+ *      without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ ******************************************************************************
+ */
+
+
+/* Includes ------------------------------------------------------------------*/
+
+#include "MQTT_JS.h"
+
+/** onSubscribeCallback
+ * @brief	onSubscribeCallback.
+ */
+jerry_value_t MQTT_JS::onSubscribeCallback;
+
+/** Constructor
+ * @brief	Constructor.
+ */
+MQTT_JS::MQTT_JS(){
+    connack_rc = 0; // MQTT connack return code
+    ip_addr = NULL;
+    netConnecting = false;
+    connectTimeout = 1000;
+    mqttConnecting = false;
+    netConnected = false;
+    connected = false;
+    retryAttempt = 0;
+
+    client = NULL;
+    mqttNetwork = NULL;
+    
+    onSubscribeCallback = NULL;
+    
+}
+
+/** Destructor
+ * @brief	Destructor.
+ */
+MQTT_JS::~MQTT_JS(){
+    if(client){
+        delete client;
+        client = NULL;
+    }
+    if(mqttNetwork){
+        delete mqttNetwork;
+        mqttNetwork = NULL;
+    }
+}
+
+/** subscribe_cb
+ * @brief	Connects the subscription callback.
+ * @param	Message Data
+ */
+void MQTT_JS::subscribe_cb(MQTT::MessageData & msgMQTT) {
+    char msg[MQTT_MAX_PAYLOAD_SIZE];
+    msg[0]='\0';
+    strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen);
+    //printf ("--->>> subscribe_cb msg: %s\n\r", msg);
+
+    if (onSubscribeCallback && jerry_value_is_function(onSubscribeCallback)) {
+        
+        jerry_value_t this_val = jerry_create_undefined ();
+        const jerry_value_t args[1] = {
+            jerry_create_string ((const jerry_char_t *)msg)
+            //jerry_create_number(3)
+        };
+
+        jerry_value_t ret_val = jerry_call_function (onSubscribeCallback, this_val, args, 1);
+
+        if (!jerry_value_has_error_flag (ret_val))
+        {
+            // handle return value
+        }
+        jerry_release_value(args[0]);
+
+        jerry_release_value (ret_val);
+        jerry_release_value (this_val);
+        
+    }
+}
+
+/** onSubscribe
+ * @brief	Calls the subscription callback.
+ * @param	Jerry Callback
+ * @return  Return code
+ */
+int MQTT_JS::onSubscribe(jerry_value_t cb){
+    
+    if (jerry_value_is_function(cb)) {
+        onSubscribeCallback = cb;
+        return 0;
+    }
+    return 1;
+}
+
+/** subscribe
+ * @brief	Subscribes to the topic.
+ * @param	Topic
+ * @return  Return code
+ */
+int MQTT_JS::subscribe (char *_topic)
+{
+    strcpy(topic, _topic);
+    if(!topic){
+        return 1; // invalid topic
+    }
+    return client->subscribe(topic, MQTT::QOS1, subscribe_cb);
+}
+
+/** unsubscribe
+ * @brief	Unsubscribes the callback
+ * @param	Topic
+ * @return  Return code
+ */
+int MQTT_JS::unsubscribe(char *pubTopic)
+{
+    return client->unsubscribe(pubTopic);
+}
+
+
+/** init
+ * @brief	Initializes the MQTT.
+ * @param	NetworkInterface
+ * @param	ID
+ * @param	Token
+ * @param	URL
+ * @param	Port
+ * @return  Return code
+ */
+int MQTT_JS::init(NetworkInterface* network, char* _id, char* _token, char* _url, char* _port)
+{
+    sprintf (id, "%s", _id);
+    sprintf (auth_token, "%s", _token);
+    sprintf (hostname, "%s", _url);
+    sprintf (subscription_url, "%s", _url);
+    sprintf (port, "%s", _port);
+    
+    if (!network) {
+        printf ("Error easy_connect\n\r");
+        return -1;
+    }
+
+    mqttNetwork = new MQTTNetwork(network);
+    
+    client = new MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>(*mqttNetwork);
+
+    return 0;
+}
+
+/** connect
+ * @brief	Connects to the MQTT Server.
+ * @param	NetworkInterface
+ * @return  Return code
+ */
+int MQTT_JS::connect(NetworkInterface* network)
+{    
+    netConnecting = true;
+    int rc = mqttNetwork->connect(hostname, atoi(port));
+    if (rc != 0)
+    {
+        //WARN("IP Stack connect returned: %d\n", rc);    
+        return rc;
+    }
+    printf ("--->TCP Connected\n\r");
+    netConnected = true;
+    netConnecting = false;
+
+    // MQTT Connect
+    mqttConnecting = true;
+    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+    data.MQTTVersion = 4;
+    data.struct_version=0;
+    data.clientID.cstring = id;
+    data.username.cstring = id;
+    data.password.cstring = auth_token;
+    data.keepAliveInterval = 15;  // in Sec    
+    if ((rc = client->connect(data)) == 0) 
+    {       
+        connected = true;
+        printf ("--->MQTT Connected\n\r");     
+    }
+    else {
+        WARN("MQTT connect returned %d\n", rc);        
+    }
+    if (rc >= 0)
+        connack_rc = rc;
+    mqttConnecting = false;
+    return rc;
+}
+
+/** getConnTimeout
+ * @brief	Returns the timeout in seconds.
+ * @param	Attempt number
+ * @return  Return code
+ */
+int MQTT_JS::getConnTimeout(int attemptNumber)
+{
+    // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute
+    // after 20 attempts, retry every 10 minutes
+    return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600;
+}
+
+/** attemptConnect
+ * @brief	Attempt connection to MQTT server.
+ * @param	NetworkInterface
+ */
+void MQTT_JS::attemptConnect(NetworkInterface* network) 
+{
+    connected = false;
+        
+    while (connect(network) != MQTT_CONNECTION_ACCEPTED) 
+    {    
+        if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) {
+            printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc);        
+            return; // don't reattempt to connect if credentials are wrong
+        } 
+        int timeout = getConnTimeout(++retryAttempt);
+        WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout);
+        
+        // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed
+        //  or maybe just add the proper members to do this disconnect and call attemptConnect(...)
+        
+        // this works - reset the system when the retry count gets to a threshold
+        if (retryAttempt == 5)
+            NVIC_SystemReset();
+        else
+            wait(timeout);
+    }
+}
+
+/** publish
+ * @brief	Publishes to the MQTT broker.
+ * @param	Data
+ * @param	Optional: retry number
+ * @return  Return code
+ */
+int MQTT_JS::publish(char* buf, int n)
+{
+    MQTT::Message message;
+    message.qos = MQTT::QOS0;
+    message.retained = false;
+    message.dup = false;
+    message.payload = (void*)buf;
+    message.payloadlen = strlen(buf);
+    
+    //LOG("Publishing %s\n\r", buf);
+    int result =  client->publish(topic, message);
+    if(result != 0){
+        if(n < 2){
+            printf("\33[31mCould not publish message. Trying again...\33[0m\n");
+            return publish(buf, n+1);
+        }
+        else{
+            printf("\33[31mError publishing message!\33[0m\n");
+            return result;
+        }
+    }
+    
+    /*
+    if(result == 0){
+        client->yield(5000);  // allow the MQTT client to receive messages
+    }
+    */
+    return result;
+} 
+
+
+/** yield
+ * @brief	Waits for the MQTT broker for subscription callback.
+ * @param	Time to wait
+ * @return  Return code
+ */
+int MQTT_JS::yield(int time)
+{
+    client->yield(time);  // allow the MQTT client to receive messages
+    return 0;
+} 
+    
+/** start_mqtt
+ * @brief	Starts a demo for MQTT.
+ * @param	NetworkInterface
+ * @return  Return code
+ */
+int MQTT_JS::start_mqtt(NetworkInterface* network)
+{   
+    sprintf (id, "hsojbpev");
+    sprintf (auth_token, "4H5vbg1KAhYi");
+    sprintf (hostname, "m20.cloudmqtt.com");
+    sprintf (subscription_url, "m20.cloudmqtt.com");
+    sprintf (port, "10023");
+
+    if (!network) {
+        printf ("Error easy_connect\n\r");
+        return -1;
+    }
+
+    mqttNetwork = new MQTTNetwork(network);
+    
+    client = new MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>(*mqttNetwork);
+
+    attemptConnect(network);   
+    if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD)    
+    {
+        while (true)
+        wait(1.0); // Permanent failures - don't retry
+    }
+    
+    int count = 0;    
+    while (true)
+    {
+        if (++count == 6)
+        {               
+            // Publish a message every ~3 second
+            if (publish((char*)"TestTest") != 0) { 
+                attemptConnect(network);   // if we have lost the connection                
+            }
+            count = 0;
+        }        
+        client->yield(500);  // allow the MQTT client to receive messages
+    }
+}