mqtt specific components for the impact mbed endpoint library

Dependents:   mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp

Revision:
49:965a65122c0f
Parent:
43:14ccc830d6a6
--- a/MQTTTransport.cpp	Tue Jul 01 22:01:09 2014 +0000
+++ b/MQTTTransport.cpp	Tue Jul 08 14:59:39 2014 +0000
@@ -26,44 +26,36 @@
  // EmulatedResourceFactory support
  #include "EmulatedResourceFactory.h"
  
- #ifdef NETWORK_MUTEX
- #include "rtos.h"
- extern Mutex *network_mutex;
- #endif
-  
- // our transmitt instance 
- MQTTTransport *_mqtt_instance =  NULL;
-
+ // our instance pointer
+ MQTTTransport *_mqtt_instance = NULL;
+   
  // MQTT callback to handle received messages
- void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
+ void _mqtt_message_handler(MQTT::MessageData& md) {
     char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
     char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
     
+    MQTT::Message &message = md.message;
+    
     memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
     memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
-    memcpy(buffer,payload,length);
-    strcpy(rcv_topic,topic);
+    
+    int length = message.payloadlen;
+    if (length > MAX_MQTT_MESSAGE_LENGTH) length = MAX_MQTT_MESSAGE_LENGTH;
+    memcpy(buffer,message.payload,length);
+    strcpy(rcv_topic,(char *)md.topicName.cstring);
     
     if (_mqtt_instance != NULL) {
-        if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
+        if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length))
             _mqtt_instance->processPongMessage(buffer,length);
-        }
-        else {
-            memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
-            memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
-            memcpy(buffer,payload,length);
-            strcpy(rcv_topic,topic);
+        else 
             _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
-        }
     }
  }
- 
- // our MQTT client endpoint
- PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
-  
+   
  // default constructor
  MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) {
-     this->m_mqtt = NULL;
+     this->m_ipstack = new MQTTEthernet();
+     this->m_mqtt =  new MQTT::Client<MQTTEthernet, Countdown>(*this->m_ipstack);
      _mqtt_instance = this;
      this->m_map = map;
      this->m_ping_counter = 1;
@@ -280,14 +272,18 @@
          memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
          char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
          sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->lock();
- #endif
-         bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->unlock();
- #endif
-         if (sent) {
+         
+         // build out the message
+         MQTT::Message mqtt_message;
+         mqtt_message.qos = MQTT::QOS0;
+         mqtt_message.retained = false;
+         mqtt_message.dup = false;
+         mqtt_message.payload = (void*)message;
+         mqtt_message.payloadlen = strlen(message)+1;
+    
+         // publish...
+         int sent = this->m_mqtt->publish(this->getTopic(),&mqtt_message);
+         if (sent == 0) {
              this->logger()->log("Result sent successfully");
              this->logger()->blinkTransportTxLED();
          }
@@ -369,14 +365,18 @@
      
      // send the message over the ping/pong topic
      //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter);
- #ifdef NETWORK_MUTEX
-     if (network_mutex != NULL) network_mutex->lock();
- #endif
-     sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message));
- #ifdef NETWORK_MUTEX
-     if (network_mutex != NULL) network_mutex->unlock();
- #endif
-     if (sent) {
+     
+     // build out the message
+     MQTT::Message mqtt_message;
+     mqtt_message.qos = MQTT::QOS0;
+     mqtt_message.retained = false;
+     mqtt_message.dup = false;
+     mqtt_message.payload = (void*)message;
+     mqtt_message.payloadlen = strlen(message)+1;
+    
+     // publish...
+     sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,&mqtt_message);
+     if (sent == 0) {
          // send succeeded
          //this->logger()->log("PING %d sent successfully",this->m_ping_counter);
          this->logger()->blinkTransportTxLED();
@@ -409,61 +409,62 @@
  bool MQTTTransport::connect() {
      memset(_mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
      if (this->m_connected == false) {
-         this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
-         this->m_mqtt = &_mqtt;
-         if (this->m_mqtt != NULL) {
-             char *id = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id);
-             this->logger()->log("MQTT Connect: ID: %s...",id);
-             if (this->m_mqtt->connect(id)) {
-                 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
-                 if (this->m_mqtt->subscribe(this->getTopic())) {
-                    if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) {
+        this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT); 
+        if (this->m_ipstack->connect(MQTT_HOSTNAME,MQTT_HOSTPORT)) {
+             
+            MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
+            data.MQTTVersion = 3;
+            data.clientID.cstring = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id);
+            this->logger()->log("MQTT Connect: ID: %s...",data.clientID.cstring);
+            
+            int rc = this->m_mqtt->connect(&data);
+            if (rc != 0) {
+                this->logger()->log("Error from MQTT connect: %d", rc);
+            }
+            else {
+                this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
+                if ((rc = this->m_mqtt->subscribe(this->getTopic(),MQTT::QOS1,_mqtt_message_handler)) != 0) {
+                    this->logger()->log("MQTT Subscribe: Topic: %s FAILED: %d", this->getTopic(),rc);
+                    this->logger()->turnLEDRed();
+                    this->m_connected = false;
+                }
+                else {
+                    if ((rc = this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC, MQTT::QOS1,_mqtt_message_handler)) == 0) {
                         this->logger()->log("MQTT CONNECTED.");
                         this->m_connected = true;
                     }
                     else {
-                        this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC);
+                        this->logger()->log("MQTT Subscribe(ALL): Topic: %s FAILED: %d", MQTT_IOC_ALL_TOPIC,rc);
                         this->logger()->turnLEDRed();
                         this->m_connected = false;
                     }
-                 }
-                 else {
-                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
-                     this->logger()->turnLEDRed();
-                     this->m_connected = false;
-                 }
-             }
-             else {
-                 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
-                 this->logger()->turnLEDRed();
-                 this->m_connected = false;
-             }
-         }
-         else {
-             this->logger()->log("MQTT Unable to allocate new instance");
+                }
+            }
+        }
+        else {
+             this->logger()->log("MQTT Connect FAILED");
              this->logger()->turnLEDRed();
              this->m_connected = false;
-         }
-     }
-     else {
+        }
+    }
+    else {
          this->logger()->log("MQTT already connected (OK)");
-     }
-     return this->m_connected;
+    }
+    return this->m_connected;
  }
  
  // disconnect from MQTT
  bool MQTTTransport::disconnect() {
      if (this->m_mqtt != NULL) {
          this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->lock();
- #endif
-         this->m_mqtt->unsubscribe(this->getTopic());
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->unlock();
- #endif
+         int rc = this->m_mqtt->unsubscribe(this->getTopic());
+         if (rc != 0)
+            this->logger()->log("Error in unsubscribing from %s ErrorCode: %d", this->getTopic(),rc);
+         if ((rc = this->m_mqtt->unsubscribe(MQTT_IOC_ALL_TOPIC)) != 0)
+            this->logger()->log("Error in unsubscribing from %s ErrorCode: %d", MQTT_IOC_ALL_TOPIC,rc);
          this->logger()->log("MQTT Disconnecting...");
-         this->m_mqtt->disconnect();
+         if ((rc = this->m_mqtt->disconnect()) != 0)
+            this->logger()->log("Error from disconnect: %d", rc);
      }
      else {
          this->logger()->log("MQTT already disconnected (OK)");
@@ -476,13 +477,7 @@
  void MQTTTransport::checkAndProcess() {
      // process any MQTT messages
      if (this->m_mqtt != NULL && this->m_connected == true) {
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->lock();
- #endif
-         bool connected = this->m_mqtt->loop();
- #ifdef NETWORK_MUTEX
-         if (network_mutex != NULL) network_mutex->unlock();
- #endif
+         bool connected = this->m_mqtt->yield(200);
          if (connected) {
             this->logger()->blinkTransportRxLED();
          }