mqtt specific components for the impact mbed endpoint library
Dependents: mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp
Diff: MQTTTransport.cpp
- Revision:
- 49:965a65122c0f
- Parent:
- 43:14ccc830d6a6
diff -r 7192a7a4edcd -r 965a65122c0f MQTTTransport.cpp --- a/MQTTTransport.cpp Tue Jul 01 22:01:09 2014 +0000 +++ b/MQTTTransport.cpp Tue Jul 08 14:59:39 2014 +0000 @@ -26,44 +26,36 @@ // EmulatedResourceFactory support #include "EmulatedResourceFactory.h" - #ifdef NETWORK_MUTEX - #include "rtos.h" - extern Mutex *network_mutex; - #endif - - // our transmitt instance - MQTTTransport *_mqtt_instance = NULL; - + // our instance pointer + MQTTTransport *_mqtt_instance = NULL; + // MQTT callback to handle received messages - void _mqtt_message_handler(char *topic,char *payload,unsigned int length) { + void _mqtt_message_handler(MQTT::MessageData& md) { char buffer[MAX_MQTT_MESSAGE_LENGTH+1]; char rcv_topic[MQTT_IOC_TOPIC_LEN+1]; + MQTT::Message &message = md.message; + memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1); - memcpy(buffer,payload,length); - strcpy(rcv_topic,topic); + + int length = message.payloadlen; + if (length > MAX_MQTT_MESSAGE_LENGTH) length = MAX_MQTT_MESSAGE_LENGTH; + memcpy(buffer,message.payload,length); + strcpy(rcv_topic,(char *)md.topicName.cstring); if (_mqtt_instance != NULL) { - if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) { + if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) _mqtt_instance->processPongMessage(buffer,length); - } - else { - memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); - memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1); - memcpy(buffer,payload,length); - strcpy(rcv_topic,topic); + else _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length); - } } } - - // our MQTT client endpoint - PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler); - + // default constructor MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) { - this->m_mqtt = NULL; + this->m_ipstack = new MQTTEthernet(); + this->m_mqtt = new MQTT::Client<MQTTEthernet, Countdown>(*this->m_ipstack); _mqtt_instance = this; this->m_map = map; this->m_ping_counter = 1; @@ -280,14 +272,18 @@ memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1); char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED; sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success); - #ifdef NETWORK_MUTEX - if (network_mutex != NULL) network_mutex->lock(); - #endif - bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message)); - #ifdef NETWORK_MUTEX - if (network_mutex != NULL) network_mutex->unlock(); - #endif - if (sent) { + + // build out the message + MQTT::Message mqtt_message; + mqtt_message.qos = MQTT::QOS0; + mqtt_message.retained = false; + mqtt_message.dup = false; + mqtt_message.payload = (void*)message; + mqtt_message.payloadlen = strlen(message)+1; + + // publish... + int sent = this->m_mqtt->publish(this->getTopic(),&mqtt_message); + if (sent == 0) { this->logger()->log("Result sent successfully"); this->logger()->blinkTransportTxLED(); } @@ -369,14 +365,18 @@ // send the message over the ping/pong topic //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter); - #ifdef NETWORK_MUTEX - if (network_mutex != NULL) network_mutex->lock(); - #endif - sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message)); - #ifdef NETWORK_MUTEX - if (network_mutex != NULL) network_mutex->unlock(); - #endif - if (sent) { + + // build out the message + MQTT::Message mqtt_message; + mqtt_message.qos = MQTT::QOS0; + mqtt_message.retained = false; + mqtt_message.dup = false; + mqtt_message.payload = (void*)message; + mqtt_message.payloadlen = strlen(message)+1; + + // publish... + sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,&mqtt_message); + if (sent == 0) { // send succeeded //this->logger()->log("PING %d sent successfully",this->m_ping_counter); this->logger()->blinkTransportTxLED(); @@ -409,61 +409,62 @@ bool MQTTTransport::connect() { memset(_mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1)); if (this->m_connected == false) { - this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT); - this->m_mqtt = &_mqtt; - if (this->m_mqtt != NULL) { - char *id = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id); - this->logger()->log("MQTT Connect: ID: %s...",id); - if (this->m_mqtt->connect(id)) { - this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic()); - if (this->m_mqtt->subscribe(this->getTopic())) { - if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) { + this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT); + if (this->m_ipstack->connect(MQTT_HOSTNAME,MQTT_HOSTPORT)) { + + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + data.MQTTVersion = 3; + data.clientID.cstring = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id); + this->logger()->log("MQTT Connect: ID: %s...",data.clientID.cstring); + + int rc = this->m_mqtt->connect(&data); + if (rc != 0) { + this->logger()->log("Error from MQTT connect: %d", rc); + } + else { + this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic()); + if ((rc = this->m_mqtt->subscribe(this->getTopic(),MQTT::QOS1,_mqtt_message_handler)) != 0) { + this->logger()->log("MQTT Subscribe: Topic: %s FAILED: %d", this->getTopic(),rc); + this->logger()->turnLEDRed(); + this->m_connected = false; + } + else { + if ((rc = this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC, MQTT::QOS1,_mqtt_message_handler)) == 0) { this->logger()->log("MQTT CONNECTED."); this->m_connected = true; } else { - this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC); + this->logger()->log("MQTT Subscribe(ALL): Topic: %s FAILED: %d", MQTT_IOC_ALL_TOPIC,rc); this->logger()->turnLEDRed(); this->m_connected = false; } - } - else { - this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic()); - this->logger()->turnLEDRed(); - this->m_connected = false; - } - } - else { - this->logger()->log("MQTT Connect: ID: %s FAILED",id); - this->logger()->turnLEDRed(); - this->m_connected = false; - } - } - else { - this->logger()->log("MQTT Unable to allocate new instance"); + } + } + } + else { + this->logger()->log("MQTT Connect FAILED"); this->logger()->turnLEDRed(); this->m_connected = false; - } - } - else { + } + } + else { this->logger()->log("MQTT already connected (OK)"); - } - return this->m_connected; + } + return this->m_connected; } // disconnect from MQTT bool MQTTTransport::disconnect() { if (this->m_mqtt != NULL) { this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic()); - #ifdef NETWORK_MUTEX - if (network_mutex != NULL) network_mutex->lock(); - #endif - this->m_mqtt->unsubscribe(this->getTopic()); - #ifdef NETWORK_MUTEX - if (network_mutex != NULL) network_mutex->unlock(); - #endif + int rc = this->m_mqtt->unsubscribe(this->getTopic()); + if (rc != 0) + this->logger()->log("Error in unsubscribing from %s ErrorCode: %d", this->getTopic(),rc); + if ((rc = this->m_mqtt->unsubscribe(MQTT_IOC_ALL_TOPIC)) != 0) + this->logger()->log("Error in unsubscribing from %s ErrorCode: %d", MQTT_IOC_ALL_TOPIC,rc); this->logger()->log("MQTT Disconnecting..."); - this->m_mqtt->disconnect(); + if ((rc = this->m_mqtt->disconnect()) != 0) + this->logger()->log("Error from disconnect: %d", rc); } else { this->logger()->log("MQTT already disconnected (OK)"); @@ -476,13 +477,7 @@ void MQTTTransport::checkAndProcess() { // process any MQTT messages if (this->m_mqtt != NULL && this->m_connected == true) { - #ifdef NETWORK_MUTEX - if (network_mutex != NULL) network_mutex->lock(); - #endif - bool connected = this->m_mqtt->loop(); - #ifdef NETWORK_MUTEX - if (network_mutex != NULL) network_mutex->unlock(); - #endif + bool connected = this->m_mqtt->yield(200); if (connected) { this->logger()->blinkTransportRxLED(); }