mqtt specific components for the impact mbed endpoint library

Dependents:   mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp

Files at this revision

API Documentation at this revision

Comitter:
ansond
Date:
Tue Jul 08 14:59:39 2014 +0000
Parent:
48:7192a7a4edcd
Commit message:
ported to new MQTT library

Changed in this revision

MQTTTransport.cpp Show annotated file Show diff for this revision Revisions of this file
MQTTTransport.h Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTTransport.cpp	Tue Jul 01 22:01:09 2014 +0000
+++ b/MQTTTransport.cpp	Tue Jul 08 14:59:39 2014 +0000
@@ -26,44 +26,36 @@
  // EmulatedResourceFactory support
  #include "EmulatedResourceFactory.h"
  
- #ifdef NETWORK_MUTEX
- #include "rtos.h"
- extern Mutex *network_mutex;
- #endif
-  
- // our transmitt instance 
- MQTTTransport *_mqtt_instance =  NULL;
-
+ // our instance pointer
+ MQTTTransport *_mqtt_instance = NULL;
+   
  // MQTT callback to handle received messages
- void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
+ void _mqtt_message_handler(MQTT::MessageData& md) {
     char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
     char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
     
+    MQTT::Message &message = md.message;
+    
     memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
     memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
-    memcpy(buffer,payload,length);
-    strcpy(rcv_topic,topic);
+    
+    int length = message.payloadlen;
+    if (length > MAX_MQTT_MESSAGE_LENGTH) length = MAX_MQTT_MESSAGE_LENGTH;
+    memcpy(buffer,message.payload,length);
+    strcpy(rcv_topic,(char *)md.topicName.cstring);
     
     if (_mqtt_instance != NULL) {
-        if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
+        if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length))
             _mqtt_instance->processPongMessage(buffer,length);
-        }
-        else {
-            memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
-            memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
-            memcpy(buffer,payload,length);
-            strcpy(rcv_topic,topic);
+        else 
             _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
-        }
     }
  }
- 
- // our MQTT client endpoint
- PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
-  
+   
  // default constructor
  MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) {
-     this->m_mqtt = NULL;
+     this->m_ipstack = new MQTTEthernet();
+     this->m_mqtt =  new MQTT::Client<MQTTEthernet, Countdown>(*this->m_ipstack);
      _mqtt_instance = this;
      this->m_map = map;
      this->m_ping_counter = 1;
@@ -280,14 +272,18 @@
          memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
          char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
          sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->lock();
- #endif
-         bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->unlock();
- #endif
-         if (sent) {
+         
+         // build out the message
+         MQTT::Message mqtt_message;
+         mqtt_message.qos = MQTT::QOS0;
+         mqtt_message.retained = false;
+         mqtt_message.dup = false;
+         mqtt_message.payload = (void*)message;
+         mqtt_message.payloadlen = strlen(message)+1;
+    
+         // publish...
+         int sent = this->m_mqtt->publish(this->getTopic(),&mqtt_message);
+         if (sent == 0) {
              this->logger()->log("Result sent successfully");
              this->logger()->blinkTransportTxLED();
          }
@@ -369,14 +365,18 @@
      
      // send the message over the ping/pong topic
      //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter);
- #ifdef NETWORK_MUTEX
-     if (network_mutex != NULL) network_mutex->lock();
- #endif
-     sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message));
- #ifdef NETWORK_MUTEX
-     if (network_mutex != NULL) network_mutex->unlock();
- #endif
-     if (sent) {
+     
+     // build out the message
+     MQTT::Message mqtt_message;
+     mqtt_message.qos = MQTT::QOS0;
+     mqtt_message.retained = false;
+     mqtt_message.dup = false;
+     mqtt_message.payload = (void*)message;
+     mqtt_message.payloadlen = strlen(message)+1;
+    
+     // publish...
+     sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,&mqtt_message);
+     if (sent == 0) {
          // send succeeded
          //this->logger()->log("PING %d sent successfully",this->m_ping_counter);
          this->logger()->blinkTransportTxLED();
@@ -409,61 +409,62 @@
  bool MQTTTransport::connect() {
      memset(_mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
      if (this->m_connected == false) {
-         this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
-         this->m_mqtt = &_mqtt;
-         if (this->m_mqtt != NULL) {
-             char *id = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id);
-             this->logger()->log("MQTT Connect: ID: %s...",id);
-             if (this->m_mqtt->connect(id)) {
-                 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
-                 if (this->m_mqtt->subscribe(this->getTopic())) {
-                    if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) {
+        this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT); 
+        if (this->m_ipstack->connect(MQTT_HOSTNAME,MQTT_HOSTPORT)) {
+             
+            MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
+            data.MQTTVersion = 3;
+            data.clientID.cstring = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id);
+            this->logger()->log("MQTT Connect: ID: %s...",data.clientID.cstring);
+            
+            int rc = this->m_mqtt->connect(&data);
+            if (rc != 0) {
+                this->logger()->log("Error from MQTT connect: %d", rc);
+            }
+            else {
+                this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
+                if ((rc = this->m_mqtt->subscribe(this->getTopic(),MQTT::QOS1,_mqtt_message_handler)) != 0) {
+                    this->logger()->log("MQTT Subscribe: Topic: %s FAILED: %d", this->getTopic(),rc);
+                    this->logger()->turnLEDRed();
+                    this->m_connected = false;
+                }
+                else {
+                    if ((rc = this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC, MQTT::QOS1,_mqtt_message_handler)) == 0) {
                         this->logger()->log("MQTT CONNECTED.");
                         this->m_connected = true;
                     }
                     else {
-                        this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC);
+                        this->logger()->log("MQTT Subscribe(ALL): Topic: %s FAILED: %d", MQTT_IOC_ALL_TOPIC,rc);
                         this->logger()->turnLEDRed();
                         this->m_connected = false;
                     }
-                 }
-                 else {
-                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
-                     this->logger()->turnLEDRed();
-                     this->m_connected = false;
-                 }
-             }
-             else {
-                 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
-                 this->logger()->turnLEDRed();
-                 this->m_connected = false;
-             }
-         }
-         else {
-             this->logger()->log("MQTT Unable to allocate new instance");
+                }
+            }
+        }
+        else {
+             this->logger()->log("MQTT Connect FAILED");
              this->logger()->turnLEDRed();
              this->m_connected = false;
-         }
-     }
-     else {
+        }
+    }
+    else {
          this->logger()->log("MQTT already connected (OK)");
-     }
-     return this->m_connected;
+    }
+    return this->m_connected;
  }
  
  // disconnect from MQTT
  bool MQTTTransport::disconnect() {
      if (this->m_mqtt != NULL) {
          this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->lock();
- #endif
-         this->m_mqtt->unsubscribe(this->getTopic());
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->unlock();
- #endif
+         int rc = this->m_mqtt->unsubscribe(this->getTopic());
+         if (rc != 0)
+            this->logger()->log("Error in unsubscribing from %s ErrorCode: %d", this->getTopic(),rc);
+         if ((rc = this->m_mqtt->unsubscribe(MQTT_IOC_ALL_TOPIC)) != 0)
+            this->logger()->log("Error in unsubscribing from %s ErrorCode: %d", MQTT_IOC_ALL_TOPIC,rc);
          this->logger()->log("MQTT Disconnecting...");
-         this->m_mqtt->disconnect();
+         if ((rc = this->m_mqtt->disconnect()) != 0)
+            this->logger()->log("Error from disconnect: %d", rc);
      }
      else {
          this->logger()->log("MQTT already disconnected (OK)");
@@ -476,13 +477,7 @@
  void MQTTTransport::checkAndProcess() {
      // process any MQTT messages
      if (this->m_mqtt != NULL && this->m_connected == true) {
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->lock();
- #endif
-         bool connected = this->m_mqtt->loop();
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->unlock();
- #endif
+         bool connected = this->m_mqtt->yield(200);
          if (connected) {
             this->logger()->blinkTransportRxLED();
          }
--- a/MQTTTransport.h	Tue Jul 01 22:01:09 2014 +0000
+++ b/MQTTTransport.h	Tue Jul 08 14:59:39 2014 +0000
@@ -23,19 +23,21 @@
 #include "Transport.h"
 
 // MQTT Support
-#include "PubSubClient.h"
+#include "MQTTEthernet.h"
+#include "MQTTClient.h"
 
 // MBED to IOC Resource Map
 #include "MBEDToIOCResourceMap.h"
 
 class MQTTTransport : public Transport {
     private:
-        PubSubClient         *m_mqtt;
-        MBEDToIOCResourceMap *m_map;
-        char                  m_topic[MQTT_IOC_TOPIC_LEN+1];
-        char                  m_endpoint_name[PERSONALITY_NAME_LEN+1];
-        int                   m_ping_counter;
-        int                   m_ping_countdown;
+        MQTTEthernet                           *m_ipstack;
+        MQTT::Client<MQTTEthernet, Countdown>  *m_mqtt;
+        MBEDToIOCResourceMap                   *m_map;
+        char                                    m_topic[MQTT_IOC_TOPIC_LEN+1];
+        char                                    m_endpoint_name[PERSONALITY_NAME_LEN+1];
+        int                                     m_ping_counter;
+        int                                     m_ping_countdown;
         
     public:
         MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map);