Example of AWS IoT connection and Web Dashboard thru STM32 Nucleo evaluation board and mbed OS.

Dependencies:   X_NUCLEO_IKS01A1 mbed FP MQTTPacket DnsQuery ATParser

Introduction

The demo is aimed to STM32 Nucleo board with WiFi and sensors expansions. The board is a "thing" for the AWS IoT service. It updates IoT service shadow with sensors data every second and checks subscription messages.

Hardware Configuration

https://github.com/Klika-Tech/nucleo-aws-iot-demo/raw/master/doc/assets/device.jpg

Software Configuration

  • Import this Project to mbed online compiler
  • Find the next part of code in main.cpp file ...

WiFi network credential

#include "mbed.h"
// WiFi network credential
#define SSID   ""  // Network must be visible otherwise it can't connect
#define PASSW  ""
#error "Wifi SSID & password empty"
  • ... And set it to your Network Name and Password. Do not forget to remove "#error" pragma line.

Information

Nucleo WiFi module is not the same as your smartphone or laptope - it is based on demo board. To avoid connection problems:

  1. Place Nucleo as close to WiFi hot spot as possible. Or...
  2. Turn on mobile hot spot in your laptop as close to the device as possible.
  3. Make sure that hot spot permits 2.4 GHz band communications
  • Setup BackEnd and store certificates using this backend setup instruction
  • Find AWS_IOT_MQTT_HOST define and change it to HTTPS point mentioned in your AWS IoT thing properties named "interact"

#define AWS_IOT_MQTT_HOST              "xxxxxxxxxx.iot.us-east-1.amazonaws.com" //Use your own host.
  • Find the certificate defines clientCRT and clientKey in main.cpp file and change it to ones provided by Amazon.

/**********************************************************************************************
***********************************************************************************************
				Device Identity Certificates: Modify for your AWS IoT Thing
***********************************************************************************************
***********************************************************************************************/

/****************************************
(somecode)-certificate.pem.crt - Amazon signed PEM sertificate.
*****************************************/

//This Client cert is example. Use own instead.
const uint8_t clientCRT[] = "\
-----BEGIN CERTIFICATE-----\n\
MIIDBjCCAe6gAwIBAgIUVph856omeIxW3UPioq+UrX1DbwowDQYJKoZIhvcNAQEL\
BQAwTTFLMEkGA1UECwxCQW1hem9uIFdlYiBTZXJ2aWNlcyBPPUFtYXpvbi5jb20g\
SW5jLiBMPVNlYXR0bGUgU1Q9V2FzaGluZ3RvbiBDPVVTMB4XDTE3MDUyNTExNTEy\
OVoXDTQ5MTIzMTIzNTk1OVowgZUxCzAJBgNVBAYTAkJZMQ4wDAYDVQQIDAVNaW5z\
azEOMAwGA1UEBwwFTWluc2sxFzAVBgNVBAoMDktsaWthLVRlY2ggTExDMRcwFQYD\
VQQLDA5LbGlrYS1UZWNoIExMQzEMMAoGA1UEAwwDUm5EMSYwJAYJKoZIhvcNAQkB\
FhdtdmF0YWxldUBrbGlrYS10ZWNoLmNvbTBZMBMGByqGSM49AgEGCCqGSM49AwEH\
A0IABCJgOQJmoTBJVPfli9Hm/JVixaxkY5rtlgrYO3hSl633A2hg0P/ue0wXDbF3\
aQ0X57IRFE4k4FEbr3UXjT/IczKjYDBeMB8GA1UdIwQYMBaAFK3YzTUPlYB2Li75\
i/z8rEogr1d6MB0GA1UdDgQWBBT18HXBaXFJuAR/0SwegnxJ+pyJ6TAMBgNVHRMB\
Af8EAjAAMA4GA1UdDwEB/wQEAwIHgDANBgkqhkiG9w0BAQsFAAOCAQEAb0Ux1aH5\
RLxjrfGqXN6rPVqh8QQRS+AyBfzmaQN8HaPZMkX5WxXLvcn0A3uWlwQxPPkcZ4zf\
51GHtFFQWB4YZ8dx8mUQ0v/j7onHjCJgZ8iDgwOyKMGtnsDZWCakQw+a6cj+NrMZ\
tzhjwCzEEP6ePcbXwErI5OOzLuWns2L/JEr2wWNkokgRuS8ewr/SQ9OLWIWa2rFM\
ahPNTb3y/qBeWdjeJmhI+TOxdqIpsF8roWP25zwo/zkzCHCjXFBrL+0CA4MpxIl9\
x02i7aAhlJ6ys80lDxdeWeeQJXRKkGknP8mcmKn3iEqqJ5s1dQePj2b5d3ldatya\
wsxQBqqZXzIWEw==\
\n\
-----END CERTIFICATE-----\n";



/**********************************************************************************************
***********************************************************************************************
						Private Key: Modify for your AWS IoT Thing
***********************************************************************************************
***********************************************************************************************/

/********************************************************************8****************************************
nucleo.key.pem - client key generated according to readme.
**************************************************************************************************************/

//This Client Key is example. Use own instead.
const uint8_t clientKey[] ="\
-----BEGIN EC PARAMETERS-----\n\
BggqhkjOPQMBBw==\
-----END EC PARAMETERS-----\n\
-----BEGIN EC PRIVATE KEY-----\n\
MHcCAQEEIHPRfWSC8/k/BsqDWKuP15dXsI9fGwpkTIsLZe6mIrAAoAoGCCqGSM49\
AwEHoUQDQgAEImA5AmahMElU9+WL0eb8lWLFrGRjmu2WCtg7eFKXrfcDaGDQ/+57\
TBcNsXdpDRfnshEUTiTgURuvdReNP8hzMg==\
-----END EC PRIVATE KEY-----\n";

Build and Check

  1. Plugin your board to USB of your PC. USB Disk Drive and USB COM Port should appear in your system.
  2. Open any Serial Console, connect it to your USB Serial Port and setup speed equal to 115200.
  3. Compile this Project and save .bin file to USB Disk Drive
  4. After board reset you should see next log in serial console:

X-NUCLEO-IDW01M1 mbed Application

connecting to AP
LOG:   int main() L#361 Connected to WiFI.
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#186 =====================================
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#187 Connecting WiFi.
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#188 Nucleo IP ADDRESS: X.X.X.X
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#189 Nucleo MAC ADDRESS: 00:11:22:33:44:55
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#190 Server Hostname: xxxxxxxx.iot.us-east-1.amazonaws.com port: 8883
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#191 Client ID: Nucleo
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#194 =====================================
LOG:   int MQTTSocket::getNTPtime(int) L#58 Success receiving time from ntp server. Tick from 1 Jan 1970 is equal to 1505399292.
--->TCP Connected
--->MQTT Connected
--->>>MQTT subscribed to: Nucleo/test
Length - 245, Publishing {"state": {"reported": {"temperature": 23.690001, "humidity": 98.190002, "pressure": 982.869141, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.420000, -2.660000, 1.750000], "magnetometer": [-3.600000, -7.100000, 53.300000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.660000, "humidity": 98.010002, "pressure": 982.770264, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.770000, -2.310000, 1.470000], "magnetometer": [-3.100000, -8.300000, 54.200000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.670000, "humidity": 98.129997, "pressure": 982.724121, "accelerometer": [-0.008000, 0.029000, 0.971000], "gyroscope": [0.630000, -2.380000, 1.400000], "magnetometer": [-3.100000, -7.900000, 53.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.690001, "humidity": 98.019997, "pressure": 982.840088, "accelerometer": [-0.009000, 0.030000, 0.972000], "gyroscope": [0.700000, -2.450000, 1.540000], "magnetometer": [-3.700000, -7.900000, 53.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.709999, "humidity": 98.040001, "pressure": 982.828613, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.630000, -2.520000, 1.470000], "magnetometer": [-2.900000, -7.400000, 52.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.719999, "humidity": 97.860001, "pressure": 982.917236, "accelerometer": [-0.026000, 0.103000, 0.891000], "gyroscope": [1.050000, -2.310000, 1.260000], "magnetometer": [-3.300000, -7.100000, 53.500000]}}}

Information

Device connection state might be checked by Green Led on the board. Green light means that device is connected and transferring data to cloud.

  1. Configure and start your dashboard using instruction and corresponding sources from github
  2. Use Blue button to set up markers to charts.
  3. Use AWS IoT console MQTT Client to test device subscription to "Nucleo/test". Just publish any message to this topic and serial port output.
  4. PROFIT!

MQTT/MQTTAsync.h

Committer:
PavelSavyhin
Date:
2017-10-19
Revision:
1:042ca9148926
Parent:
0:4cdaf9b1e7d0

File content as of revision 1:042ca9148926:

/*******************************************************************************
 * Copyright (c) 2014 IBM Corp.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Ian Craggs - initial API and implementation and/or initial documentation
 *******************************************************************************/

#if !defined(MQTTASYNC_H)
#define MQTTASYNC_H

#include "FP.h"
#include "MQTTPacket.h"
#include "stdio.h"

namespace MQTT
{


enum QoS { QOS0, QOS1, QOS2 };


struct Message
{
    enum QoS qos;
    bool retained;
    bool dup;
    unsigned short id;
    void *payload;
    size_t payloadlen;
};


class PacketId
{
public:
    PacketId();
    
    int getNext();
   
private:
    static const int MAX_PACKET_ID = 65535;
    int next;
};

typedef void (*messageHandler)(Message*);

typedef struct limits
{
	int MAX_MQTT_PACKET_SIZE; // 
	int MAX_MESSAGE_HANDLERS;  // each subscription requires a message handler
	int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
	int command_timeout_ms;
		
	limits()
	{
		MAX_MQTT_PACKET_SIZE = 100;
		MAX_MESSAGE_HANDLERS = 5;
		MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
		command_timeout_ms = 30000;
	}
} Limits;
  

/**
 * @class Async
 * @brief non-blocking, threaded MQTT client API
 * @param Network a network class which supports send, receive
 * @param Timer a timer class with the methods: 
 */ 
template<class Network, class Timer, class Thread, class Mutex> class Async
{
    
public:    

	struct Result
	{
    	/* success or failure result data */
    	Async<Network, Timer, Thread, Mutex>* client;
		int rc;
	};

	typedef void (*resultHandler)(Result*);	
   
    Async(Network* network, const Limits limits = Limits()); 
        
    typedef struct
    {
        Async* client;
        Network* network;
    } connectionLostInfo;
    
    typedef int (*connectionLostHandlers)(connectionLostInfo*);
    
    /** Set the connection lost callback - called whenever the connection is lost and we should be connected
     *  @param clh - pointer to the callback function
     */
    void setConnectionLostHandler(connectionLostHandlers clh)
    {
        connectionLostHandler.attach(clh);
    }
    
    /** Set the default message handling callback - used for any message which does not match a subscription message handler
     *  @param mh - pointer to the callback function
     */
    void setDefaultMessageHandler(messageHandler mh)
    {
        defaultMessageHandler.attach(mh);
    }
           
    int connect(resultHandler fn, MQTTPacket_connectData* options = 0);
    
     template<class T>
    int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0);  // alternative to pass in pointer to member function
        
    int publish(resultHandler rh, const char* topic, Message* message);
    
    int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh);
    
    int unsubscribe(resultHandler rh, const char* topicFilter);
    
    int disconnect(resultHandler rh);
    
private:

    void run(void const *argument);
    int cycle(int timeout);
    int waitfor(int packet_type, Timer& atimer);
	int keepalive();
	int findFreeOperation();

    int decodePacket(int* value, int timeout);
    int readPacket(int timeout);
    int sendPacket(int length, int timeout);
	int deliverMessage(MQTTString* topic, Message* message);
    
    Thread* thread;
    Network* ipstack;
    
    Limits limits;
    
    char* buf;  
    char* readbuf;

    Timer ping_timer, connect_timer;
    unsigned int keepAliveInterval;
	bool ping_outstanding;
    
    PacketId packetid;
    
    typedef FP<void, Result*> resultHandlerFP;    
    resultHandlerFP connectHandler; 
    
    typedef FP<void, Message*> messageHandlerFP;
    struct MessageHandlers
    {
    	const char* topic;
    	messageHandlerFP fp;
    } *messageHandlers;      // Message handlers are indexed by subscription topic
    
    // how many concurrent operations should we allow?  Each one will require a function pointer
    struct Operations
    {
    	unsigned short id;
    	resultHandlerFP fp;
    	const char* topic;         // if this is a publish, store topic name in case republishing is required
    	Message* message;    // for publish, 
    	Timer timer;         // to check if the command has timed out
    } *operations;           // result handlers are indexed by packet ids

	static void threadfn(void* arg);
	
	messageHandlerFP defaultMessageHandler;
    
    typedef FP<int, connectionLostInfo*> connectionLostFP;
    
    connectionLostFP connectionLostHandler;
    
};

}


template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg)
{
    ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
}


template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits)  : limits(limits), packetid()
{
	this->thread = 0;
	this->ipstack = network;
	this->ping_timer = Timer();
	this->ping_outstanding = 0;
	   
	// How to make these memory allocations portable?  I was hoping to avoid the heap
	buf = new char[limits.MAX_MQTT_PACKET_SIZE];
	readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
	this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS];
	for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
		operations[i].id = 0;
	this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
		messageHandlers[i].topic = 0;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
{
    int sent = 0;
    
    while (sent < length)
        sent += ipstack->write(&buf[sent], length, timeout);
	if (sent == length)
	    ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet    
    return sent;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
{
    char c;
    int multiplier = 1;
    int len = 0;
	const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;

    *value = 0;
    do
    {
        int rc = MQTTPACKET_READ_ERROR;

        if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
        {
            rc = MQTTPACKET_READ_ERROR; /* bad data */
            goto exit;
        }
        rc = ipstack->read(&c, 1, timeout);
        if (rc != 1)
            goto exit;
        *value += (c & 127) * multiplier;
        multiplier *= 128;
    } while ((c & 128) != 0);
exit:
    return len;
}


/**
 * If any read fails in this method, then we should disconnect from the network, as on reconnect
 * the packets can be retried. 
 * @param timeout the max time to wait for the packet read to complete, in milliseconds
 * @return the MQTT packet type, or -1 if none
 */
template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) 
{
    int rc = -1;
    MQTTHeader header = {0};
    int len = 0;
    int rem_len = 0;

    /* 1. read the header byte.  This has the packet type in it */
    if (ipstack->read(readbuf, 1, timeout) != 1)
        goto exit;

    len = 1;
    /* 2. read the remaining length.  This is variable in itself */
    decodePacket(&rem_len, timeout);
    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */

    /* 3. read the rest of the buffer using a callback to supply the rest of the data */
    if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
        goto exit;

    header.byte = readbuf[0];
    rc = header.bits.type;
exit:
    return rc;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
{
	int rc = -1;

	// we have to find the right message handler - indexed by topic
	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
	{
		if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
		{
			messageHandlers[i].fp(message);
			rc = 0;
			break;
		}
	}
	
	return rc;
}



template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout)
{
    /* get one piece of work off the wire and one pass through */

    // read the socket, see what work is due
    int packet_type = readPacket(timeout);
    
	int len, rc;
    switch (packet_type)
    {
        case CONNACK:
			if (this->thread)
			{
				Result res = {this, 0};
            	if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
                	;
				connectHandler(&res);
				connectHandler.detach(); // only invoke the callback once
			}
			break;
        case PUBACK:
        	if (this->thread)
        		; //call resultHandler
        case SUBACK:
            break;
        case PUBLISH:
			MQTTString topicName;
			Message msg;
			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);;
			if (msg.qos == QOS0)
				deliverMessage(&topicName, &msg);
            break;
        case PUBREC:
   	        int type, dup, mypacketid;
   	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
   	            ; 
   	        // must lock this access against the application thread, if we are multi-threaded
			len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
		    rc = sendPacket(len, timeout); // send the PUBREL packet
			if (rc != len) 
				goto exit; // there was a problem

            break;
        case PUBCOMP:
            break;
        case PINGRESP:
			ping_outstanding = false;
            break;
    }
	keepalive();
exit:
    return packet_type;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive()
{
	int rc = 0;

	if (keepAliveInterval == 0)
		goto exit;

	if (ping_timer.expired())
	{
		if (ping_outstanding)
			rc = -1;
		else
		{
			int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
			rc = sendPacket(len, 1000); // send the ping packet
			if (rc != len) 
				rc = -1; // indicate there's a problem
			else
				ping_outstanding = true;
		}
	}

exit:
	return rc;
}


template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument)
{
	while (true)
		cycle(ping_timer.left_ms());
}


// only used in single-threaded mode where one command at a time is in process
template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
{
	int rc = -1;
	
	do
    {
		if (atimer.expired()) 
			break; // we timed out
	}
	while ((rc = cycle(atimer.left_ms())) != packet_type);	
	
	return rc;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options)
{
	connect_timer.countdown(limits.command_timeout_ms);

    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
    if (options == 0)
        options = &default_options; // set default options if none were supplied
    
    this->keepAliveInterval = options->keepAliveInterval;
	ping_timer.countdown(this->keepAliveInterval);
    int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
    int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet
	if (rc != len) 
		goto exit; // there was a problem
    
    if (resultHandler == 0)     // wait until the connack is received 
    {
        // this will be a blocking call, wait for the connack
		if (waitfor(CONNACK, connect_timer) == CONNACK)
		{
        	int connack_rc = -1;
        	if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
	        	rc = connack_rc;
	    }
    }
    else
    {
        // set connect response callback function
        connectHandler.attach(resultHandler);
        
        // start background thread            
        this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
    }
    
exit:
    return rc;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation()
{
	int found = -1;
	for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
	{
		if (operations[i].id == 0)
		{
			found = i;
			break;
		}
	}
	return found;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
	int index = 0;
	if (this->thread)
		index = findFreeOperation();	
	Timer& atimer = operations[index].timer;
	
	atimer.countdown(limits.command_timeout_ms);
    MQTTString topic = {(char*)topicFilter, 0, 0};
    
    int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
    int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
	if (rc != len) 
		goto exit; // there was a problem
    
    /* wait for suback */
    if (resultHandler == 0)
    {
        // this will block
        if (waitfor(SUBACK, atimer) == SUBACK)
        {
            int count = 0, grantedQoS = -1, mypacketid;
            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
                rc = grantedQoS; // 0, 1, 2 or 0x80 
            if (rc != 0x80)
            {
            	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
				{
					if (messageHandlers[i].topic == 0)
					{
						messageHandlers[i].topic = topicFilter;
						messageHandlers[i].fp.attach(messageHandler);
						rc = 0;
						break;
					}
				}
            }
        }
    }
    else
    {
        // set subscribe response callback function
        
    }
    
exit:
    return rc;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter)
{
	int index = 0;
	if (this->thread)
		index = findFreeOperation();	
	Timer& atimer = operations[index].timer;

	atimer.countdown(limits.command_timeout_ms);
    MQTTString topic = {(char*)topicFilter, 0, 0};
    
    int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
    int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
	if (rc != len) 
		goto exit; // there was a problem
    
    // set unsubscribe response callback function
        
    
exit:
    return rc;
}


   
template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message)
{
	int index = 0;
	if (this->thread)
		index = findFreeOperation();	
	Timer& atimer = operations[index].timer;

	atimer.countdown(limits.command_timeout_ms);
    MQTTString topic = {(char*)topicName, 0, 0};

	if (message->qos == QOS1 || message->qos == QOS2)
		message->id = packetid.getNext();
    
	int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen);
    int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
	if (rc != len) 
		goto exit; // there was a problem
    
    /* wait for acks */
    if (resultHandler == 0)
    {
 		if (message->qos == QOS1)
		{
	        if (waitfor(PUBACK, atimer) == PUBACK)
    	    {
    	        int type, dup, mypacketid;
    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
    	            rc = 0; 
    	    }
		}
		else if (message->qos == QOS2)
		{
	        if (waitfor(PUBCOMP, atimer) == PUBCOMP)
	   	    {
	   	    	int type, dup, mypacketid;
            	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
    	           	rc = 0; 
			}

		}
    }
    else
    {
        // set publish response callback function
        
    }
    
exit:
    return rc;
}


template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler)
{  
    Timer timer = Timer(limits.command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
    int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
    int rc = sendPacket(len, timer.left_ms());   // send the disconnect packet
    
    return (rc == len) ? 0 : -1;
}



#endif