Provides Javascript wrappers for MQTT.

Dependencies:   mbed-http DEVI2C_JS MQTTPacket FP

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTT_JS.cpp Source File

MQTT_JS.cpp

00001 /*
00002  * @file    MQTT_JS.cpp
00003  * @author  ST
00004  * @version V1.0.0
00005  * @date    9 October 2017
00006  * @brief   Implementation of MQTT for Javascript.
00007  ******************************************************************************
00008  * @attention
00009  *
00010  * <h2><center>&copy; COPYRIGHT(c) 2017 STMicroelectronics</center></h2>
00011  *
00012  * Redistribution and use in source and binary forms, with or without modification,
00013  * are permitted provided that the following conditions are met:
00014  *   1. Redistributions of source code must retain the above copyright notice,
00015  *      this list of conditions and the following disclaimer.
00016  *   2. Redistributions in binary form must reproduce the above copyright notice,
00017  *      this list of conditions and the following disclaimer in the documentation
00018  *      and/or other materials provided with the distribution.
00019  *   3. Neither the name of STMicroelectronics nor the names of its contributors
00020  *      may be used to endorse or promote products derived from this software
00021  *      without specific prior written permission.
00022  *
00023  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00024  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00025  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00026  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
00027  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00028  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
00029  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
00031  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00032  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00033  *
00034  ******************************************************************************
00035  */
00036 
00037 
00038 /* Includes ------------------------------------------------------------------*/
00039 
00040 #include "MQTT_JS.h"
00041 
00042 /** onSubscribeCallback
00043  * @brief   onSubscribeCallback.
00044  */
00045 jerry_value_t MQTT_JS::onSubscribeCallback;
00046 
00047 /** Constructor
00048  * @brief   Constructor.
00049  */
00050 MQTT_JS::MQTT_JS (){
00051     connack_rc = 0; // MQTT connack return code
00052     ip_addr = NULL;
00053     netConnecting = false;
00054     connectTimeout = 1000;
00055     mqttConnecting = false;
00056     netConnected = false;
00057     connected = false;
00058     retryAttempt = 0;
00059 
00060     client = NULL;
00061     mqttNetwork = NULL;
00062     
00063     onSubscribeCallback = NULL;
00064     
00065 }
00066 
00067 /** Destructor
00068  * @brief   Destructor.
00069  */
00070 MQTT_JS::~MQTT_JS (){
00071     if(client){
00072         delete client;
00073         client = NULL;
00074     }
00075     if(mqttNetwork){
00076         delete mqttNetwork;
00077         mqttNetwork = NULL;
00078     }
00079 }
00080 
00081 /** subscribe_cb
00082  * @brief   Connects the subscription callback.
00083  * @param   Message Data
00084  */
00085 void MQTT_JS::subscribe_cb(MQTT::MessageData & msgMQTT) {
00086     char msg[MQTT_MAX_PAYLOAD_SIZE];
00087     msg[0]='\0';
00088     strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen);
00089     //printf ("--->>> subscribe_cb msg: %s\n\r", msg);
00090 
00091     if (onSubscribeCallback && jerry_value_is_function(onSubscribeCallback)) {
00092         
00093         jerry_value_t this_val = jerry_create_undefined ();
00094         const jerry_value_t args[1] = {
00095             jerry_create_string ((const jerry_char_t *)msg)
00096             //jerry_create_number(3)
00097         };
00098 
00099         jerry_value_t ret_val = jerry_call_function (onSubscribeCallback, this_val, args, 1);
00100 
00101         if (!jerry_value_has_error_flag (ret_val))
00102         {
00103             // handle return value
00104         }
00105         jerry_release_value(args[0]);
00106 
00107         jerry_release_value (ret_val);
00108         jerry_release_value (this_val);
00109         
00110     }
00111 }
00112 
00113 /** onSubscribe
00114  * @brief   Calls the subscription callback.
00115  * @param   Jerry Callback
00116  * @return  Return code
00117  */
00118 int MQTT_JS::onSubscribe(jerry_value_t cb){
00119     
00120     if (jerry_value_is_function(cb)) {
00121         onSubscribeCallback = cb;
00122         return 0;
00123     }
00124     return 1;
00125 }
00126 
00127 /** subscribe
00128  * @brief   Subscribes to the topic.
00129  * @param   Topic
00130  * @return  Return code
00131  */
00132 int MQTT_JS::subscribe (char *_topic)
00133 {
00134     strcpy(topic, _topic);
00135     if(!topic){
00136         return 1; // invalid topic
00137     }
00138     return client->subscribe(topic, MQTT::QOS1, subscribe_cb);
00139 }
00140 
00141 /** unsubscribe
00142  * @brief   Unsubscribes the callback
00143  * @param   Topic
00144  * @return  Return code
00145  */
00146 int MQTT_JS::unsubscribe(char *pubTopic)
00147 {
00148     return client->unsubscribe(pubTopic);
00149 }
00150 
00151 
00152 /** init
00153  * @brief   Initializes the MQTT.
00154  * @param   NetworkInterface
00155  * @param   ID
00156  * @param   Token
00157  * @param   URL
00158  * @param   Port
00159  * @return  Return code
00160  */
00161 int MQTT_JS::init(NetworkInterface* network, char* _id, char* _token, char* _url, char* _port)
00162 {
00163     sprintf (id, "%s", _id);
00164     sprintf (auth_token, "%s", _token);
00165     sprintf (hostname, "%s", _url);
00166     sprintf (subscription_url, "%s", _url);
00167     sprintf (port, "%s", _port);
00168     
00169     if (!network) {
00170         printf ("Error easy_connect\n\r");
00171         return -1;
00172     }
00173 
00174     mqttNetwork = new MQTTNetwork(network);
00175     
00176     client = new MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE> (*mqttNetwork);
00177 
00178     return 0;
00179 }
00180 
00181 /** connect
00182  * @brief   Connects to the MQTT Server.
00183  * @param   NetworkInterface
00184  * @return  Return code
00185  */
00186 int MQTT_JS::connect(NetworkInterface* network)
00187 {    
00188     netConnecting = true;
00189     int rc = mqttNetwork->connect(hostname, atoi(port));
00190     if (rc != 0)
00191     {
00192         //WARN("IP Stack connect returned: %d\n", rc);    
00193         return rc;
00194     }
00195     printf ("--->TCP Connected\n\r");
00196     netConnected = true;
00197     netConnecting = false;
00198 
00199     // MQTT Connect
00200     mqttConnecting = true;
00201     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
00202     data.MQTTVersion = 4;
00203     data.struct_version=0;
00204     data.clientID.cstring = id;
00205     data.username.cstring = id;
00206     data.password.cstring = auth_token;
00207     data.keepAliveInterval = 15;  // in Sec    
00208     if ((rc = client->connect(data)) == 0) 
00209     {       
00210         connected = true;
00211         printf ("--->MQTT Connected\n\r");     
00212     }
00213     else {
00214         WARN("MQTT connect returned %d\n", rc);        
00215     }
00216     if (rc >= 0)
00217         connack_rc = rc;
00218     mqttConnecting = false;
00219     return rc;
00220 }
00221 
00222 /** getConnTimeout
00223  * @brief   Returns the timeout in seconds.
00224  * @param   Attempt number
00225  * @return  Return code
00226  */
00227 int MQTT_JS::getConnTimeout(int attemptNumber)
00228 {
00229     // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute
00230     // after 20 attempts, retry every 10 minutes
00231     return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600;
00232 }
00233 
00234 /** attemptConnect
00235  * @brief   Attempt connection to MQTT server.
00236  * @param   NetworkInterface
00237  */
00238 void MQTT_JS::attemptConnect(NetworkInterface* network) 
00239 {
00240     connected = false;
00241         
00242     while (connect(network) != MQTT_CONNECTION_ACCEPTED) 
00243     {    
00244         if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) {
00245             printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc);        
00246             return; // don't reattempt to connect if credentials are wrong
00247         } 
00248         int timeout = getConnTimeout(++retryAttempt);
00249         WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout);
00250         
00251         // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed
00252         //  or maybe just add the proper members to do this disconnect and call attemptConnect(...)
00253         
00254         // this works - reset the system when the retry count gets to a threshold
00255         if (retryAttempt == 5)
00256             NVIC_SystemReset();
00257         else
00258             wait(timeout);
00259     }
00260 }
00261 
00262 /** publish
00263  * @brief   Publishes to the MQTT broker.
00264  * @param   Data
00265  * @param   Optional: retry number
00266  * @return  Return code
00267  */
00268 int MQTT_JS::publish(char* buf, int n)
00269 {
00270     MQTT::Message message;
00271     message.qos = MQTT::QOS0;
00272     message.retained = false;
00273     message.dup = false;
00274     message.payload = (void*)buf;
00275     message.payloadlen = strlen(buf);
00276     
00277     //LOG("Publishing %s\n\r", buf);
00278     int result =  client->publish(topic, message);
00279     if(result != 0){
00280         if(n < 2){
00281             printf("\33[31mCould not publish message. Trying again...\33[0m\n");
00282             return publish(buf, n+1);
00283         }
00284         else{
00285             printf("\33[31mError publishing message!\33[0m\n");
00286             return result;
00287         }
00288     }
00289     
00290     /*
00291     if(result == 0){
00292         client->yield(5000);  // allow the MQTT client to receive messages
00293     }
00294     */
00295     return result;
00296 } 
00297 
00298 
00299 /** yield
00300  * @brief   Waits for the MQTT broker for subscription callback.
00301  * @param   Time to wait
00302  * @return  Return code
00303  */
00304 int MQTT_JS::yield(int time)
00305 {
00306     client->yield(time);  // allow the MQTT client to receive messages
00307     return 0;
00308 } 
00309     
00310 /** start_mqtt
00311  * @brief   Starts a demo for MQTT.
00312  * @param   NetworkInterface
00313  * @return  Return code
00314  */
00315 int MQTT_JS::start_mqtt(NetworkInterface* network)
00316 {   
00317     sprintf (id, "hsojbpev");
00318     sprintf (auth_token, "4H5vbg1KAhYi");
00319     sprintf (hostname, "m20.cloudmqtt.com");
00320     sprintf (subscription_url, "m20.cloudmqtt.com");
00321     sprintf (port, "10023");
00322 
00323     if (!network) {
00324         printf ("Error easy_connect\n\r");
00325         return -1;
00326     }
00327 
00328     mqttNetwork = new MQTTNetwork(network);
00329     
00330     client = new MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE> (*mqttNetwork);
00331 
00332     attemptConnect(network);   
00333     if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD)    
00334     {
00335         while (true)
00336         wait(1.0); // Permanent failures - don't retry
00337     }
00338     
00339     int count = 0;    
00340     while (true)
00341     {
00342         if (++count == 6)
00343         {               
00344             // Publish a message every ~3 second
00345             if (publish((char*)"TestTest") != 0) { 
00346                 attemptConnect(network);   // if we have lost the connection                
00347             }
00348             count = 0;
00349         }        
00350         client->yield(500);  // allow the MQTT client to receive messages
00351     }
00352 }