Example of AWS IoT connection and Web Dashboard thru STM32 Nucleo evaluation board and mbed OS.

Dependencies:   X_NUCLEO_IKS01A1 mbed FP MQTTPacket DnsQuery ATParser

Introduction

The demo is aimed to STM32 Nucleo board with WiFi and sensors expansions. The board is a "thing" for the AWS IoT service. It updates IoT service shadow with sensors data every second and checks subscription messages.

Hardware Configuration

https://github.com/Klika-Tech/nucleo-aws-iot-demo/raw/master/doc/assets/device.jpg

Software Configuration

  • Import this Project to mbed online compiler
  • Find the next part of code in main.cpp file ...

WiFi network credential

#include "mbed.h"
// WiFi network credential
#define SSID   ""  // Network must be visible otherwise it can't connect
#define PASSW  ""
#error "Wifi SSID & password empty"
  • ... And set it to your Network Name and Password. Do not forget to remove "#error" pragma line.

Information

Nucleo WiFi module is not the same as your smartphone or laptope - it is based on demo board. To avoid connection problems:

  1. Place Nucleo as close to WiFi hot spot as possible. Or...
  2. Turn on mobile hot spot in your laptop as close to the device as possible.
  3. Make sure that hot spot permits 2.4 GHz band communications
  • Setup BackEnd and store certificates using this backend setup instruction
  • Find AWS_IOT_MQTT_HOST define and change it to HTTPS point mentioned in your AWS IoT thing properties named "interact"

#define AWS_IOT_MQTT_HOST              "xxxxxxxxxx.iot.us-east-1.amazonaws.com" //Use your own host.
  • Find the certificate defines clientCRT and clientKey in main.cpp file and change it to ones provided by Amazon.

/**********************************************************************************************
***********************************************************************************************
				Device Identity Certificates: Modify for your AWS IoT Thing
***********************************************************************************************
***********************************************************************************************/

/****************************************
(somecode)-certificate.pem.crt - Amazon signed PEM sertificate.
*****************************************/

//This Client cert is example. Use own instead.
const uint8_t clientCRT[] = "\
-----BEGIN CERTIFICATE-----\n\
MIIDBjCCAe6gAwIBAgIUVph856omeIxW3UPioq+UrX1DbwowDQYJKoZIhvcNAQEL\
BQAwTTFLMEkGA1UECwxCQW1hem9uIFdlYiBTZXJ2aWNlcyBPPUFtYXpvbi5jb20g\
SW5jLiBMPVNlYXR0bGUgU1Q9V2FzaGluZ3RvbiBDPVVTMB4XDTE3MDUyNTExNTEy\
OVoXDTQ5MTIzMTIzNTk1OVowgZUxCzAJBgNVBAYTAkJZMQ4wDAYDVQQIDAVNaW5z\
azEOMAwGA1UEBwwFTWluc2sxFzAVBgNVBAoMDktsaWthLVRlY2ggTExDMRcwFQYD\
VQQLDA5LbGlrYS1UZWNoIExMQzEMMAoGA1UEAwwDUm5EMSYwJAYJKoZIhvcNAQkB\
FhdtdmF0YWxldUBrbGlrYS10ZWNoLmNvbTBZMBMGByqGSM49AgEGCCqGSM49AwEH\
A0IABCJgOQJmoTBJVPfli9Hm/JVixaxkY5rtlgrYO3hSl633A2hg0P/ue0wXDbF3\
aQ0X57IRFE4k4FEbr3UXjT/IczKjYDBeMB8GA1UdIwQYMBaAFK3YzTUPlYB2Li75\
i/z8rEogr1d6MB0GA1UdDgQWBBT18HXBaXFJuAR/0SwegnxJ+pyJ6TAMBgNVHRMB\
Af8EAjAAMA4GA1UdDwEB/wQEAwIHgDANBgkqhkiG9w0BAQsFAAOCAQEAb0Ux1aH5\
RLxjrfGqXN6rPVqh8QQRS+AyBfzmaQN8HaPZMkX5WxXLvcn0A3uWlwQxPPkcZ4zf\
51GHtFFQWB4YZ8dx8mUQ0v/j7onHjCJgZ8iDgwOyKMGtnsDZWCakQw+a6cj+NrMZ\
tzhjwCzEEP6ePcbXwErI5OOzLuWns2L/JEr2wWNkokgRuS8ewr/SQ9OLWIWa2rFM\
ahPNTb3y/qBeWdjeJmhI+TOxdqIpsF8roWP25zwo/zkzCHCjXFBrL+0CA4MpxIl9\
x02i7aAhlJ6ys80lDxdeWeeQJXRKkGknP8mcmKn3iEqqJ5s1dQePj2b5d3ldatya\
wsxQBqqZXzIWEw==\
\n\
-----END CERTIFICATE-----\n";



/**********************************************************************************************
***********************************************************************************************
						Private Key: Modify for your AWS IoT Thing
***********************************************************************************************
***********************************************************************************************/

/********************************************************************8****************************************
nucleo.key.pem - client key generated according to readme.
**************************************************************************************************************/

//This Client Key is example. Use own instead.
const uint8_t clientKey[] ="\
-----BEGIN EC PARAMETERS-----\n\
BggqhkjOPQMBBw==\
-----END EC PARAMETERS-----\n\
-----BEGIN EC PRIVATE KEY-----\n\
MHcCAQEEIHPRfWSC8/k/BsqDWKuP15dXsI9fGwpkTIsLZe6mIrAAoAoGCCqGSM49\
AwEHoUQDQgAEImA5AmahMElU9+WL0eb8lWLFrGRjmu2WCtg7eFKXrfcDaGDQ/+57\
TBcNsXdpDRfnshEUTiTgURuvdReNP8hzMg==\
-----END EC PRIVATE KEY-----\n";

Build and Check

  1. Plugin your board to USB of your PC. USB Disk Drive and USB COM Port should appear in your system.
  2. Open any Serial Console, connect it to your USB Serial Port and setup speed equal to 115200.
  3. Compile this Project and save .bin file to USB Disk Drive
  4. After board reset you should see next log in serial console:

X-NUCLEO-IDW01M1 mbed Application

connecting to AP
LOG:   int main() L#361 Connected to WiFI.
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#186 =====================================
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#187 Connecting WiFi.
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#188 Nucleo IP ADDRESS: X.X.X.X
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#189 Nucleo MAC ADDRESS: 00:11:22:33:44:55
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#190 Server Hostname: xxxxxxxx.iot.us-east-1.amazonaws.com port: 8883
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#191 Client ID: Nucleo
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#194 =====================================
LOG:   int MQTTSocket::getNTPtime(int) L#58 Success receiving time from ntp server. Tick from 1 Jan 1970 is equal to 1505399292.
--->TCP Connected
--->MQTT Connected
--->>>MQTT subscribed to: Nucleo/test
Length - 245, Publishing {"state": {"reported": {"temperature": 23.690001, "humidity": 98.190002, "pressure": 982.869141, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.420000, -2.660000, 1.750000], "magnetometer": [-3.600000, -7.100000, 53.300000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.660000, "humidity": 98.010002, "pressure": 982.770264, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.770000, -2.310000, 1.470000], "magnetometer": [-3.100000, -8.300000, 54.200000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.670000, "humidity": 98.129997, "pressure": 982.724121, "accelerometer": [-0.008000, 0.029000, 0.971000], "gyroscope": [0.630000, -2.380000, 1.400000], "magnetometer": [-3.100000, -7.900000, 53.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.690001, "humidity": 98.019997, "pressure": 982.840088, "accelerometer": [-0.009000, 0.030000, 0.972000], "gyroscope": [0.700000, -2.450000, 1.540000], "magnetometer": [-3.700000, -7.900000, 53.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.709999, "humidity": 98.040001, "pressure": 982.828613, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.630000, -2.520000, 1.470000], "magnetometer": [-2.900000, -7.400000, 52.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.719999, "humidity": 97.860001, "pressure": 982.917236, "accelerometer": [-0.026000, 0.103000, 0.891000], "gyroscope": [1.050000, -2.310000, 1.260000], "magnetometer": [-3.300000, -7.100000, 53.500000]}}}

Information

Device connection state might be checked by Green Led on the board. Green light means that device is connected and transferring data to cloud.

  1. Configure and start your dashboard using instruction and corresponding sources from github
  2. Use Blue button to set up markers to charts.
  3. Use AWS IoT console MQTT Client to test device subscription to "Nucleo/test". Just publish any message to this topic and serial port output.
  4. PROFIT!
Committer:
PavelSavyhin
Date:
Thu Oct 19 11:36:41 2017 +0000
Revision:
1:042ca9148926
Parent:
0:4cdaf9b1e7d0
Connection times are optimized and logging is extended.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
PavelSavyhin 0:4cdaf9b1e7d0 1 /*******************************************************************************
PavelSavyhin 0:4cdaf9b1e7d0 2 * Copyright (c) 2014 IBM Corp.
PavelSavyhin 0:4cdaf9b1e7d0 3 *
PavelSavyhin 0:4cdaf9b1e7d0 4 * All rights reserved. This program and the accompanying materials
PavelSavyhin 0:4cdaf9b1e7d0 5 * are made available under the terms of the Eclipse Public License v1.0
PavelSavyhin 0:4cdaf9b1e7d0 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
PavelSavyhin 0:4cdaf9b1e7d0 7 *
PavelSavyhin 0:4cdaf9b1e7d0 8 * The Eclipse Public License is available at
PavelSavyhin 0:4cdaf9b1e7d0 9 * http://www.eclipse.org/legal/epl-v10.html
PavelSavyhin 0:4cdaf9b1e7d0 10 * and the Eclipse Distribution License is available at
PavelSavyhin 0:4cdaf9b1e7d0 11 * http://www.eclipse.org/org/documents/edl-v10.php.
PavelSavyhin 0:4cdaf9b1e7d0 12 *
PavelSavyhin 0:4cdaf9b1e7d0 13 * Contributors:
PavelSavyhin 0:4cdaf9b1e7d0 14 * Ian Craggs - initial API and implementation and/or initial documentation
PavelSavyhin 0:4cdaf9b1e7d0 15 *******************************************************************************/
PavelSavyhin 0:4cdaf9b1e7d0 16
PavelSavyhin 0:4cdaf9b1e7d0 17 #if !defined(MQTTASYNC_H)
PavelSavyhin 0:4cdaf9b1e7d0 18 #define MQTTASYNC_H
PavelSavyhin 0:4cdaf9b1e7d0 19
PavelSavyhin 0:4cdaf9b1e7d0 20 #include "FP.h"
PavelSavyhin 0:4cdaf9b1e7d0 21 #include "MQTTPacket.h"
PavelSavyhin 0:4cdaf9b1e7d0 22 #include "stdio.h"
PavelSavyhin 0:4cdaf9b1e7d0 23
PavelSavyhin 0:4cdaf9b1e7d0 24 namespace MQTT
PavelSavyhin 0:4cdaf9b1e7d0 25 {
PavelSavyhin 0:4cdaf9b1e7d0 26
PavelSavyhin 0:4cdaf9b1e7d0 27
PavelSavyhin 0:4cdaf9b1e7d0 28 enum QoS { QOS0, QOS1, QOS2 };
PavelSavyhin 0:4cdaf9b1e7d0 29
PavelSavyhin 0:4cdaf9b1e7d0 30
PavelSavyhin 0:4cdaf9b1e7d0 31 struct Message
PavelSavyhin 0:4cdaf9b1e7d0 32 {
PavelSavyhin 0:4cdaf9b1e7d0 33 enum QoS qos;
PavelSavyhin 0:4cdaf9b1e7d0 34 bool retained;
PavelSavyhin 0:4cdaf9b1e7d0 35 bool dup;
PavelSavyhin 0:4cdaf9b1e7d0 36 unsigned short id;
PavelSavyhin 0:4cdaf9b1e7d0 37 void *payload;
PavelSavyhin 0:4cdaf9b1e7d0 38 size_t payloadlen;
PavelSavyhin 0:4cdaf9b1e7d0 39 };
PavelSavyhin 0:4cdaf9b1e7d0 40
PavelSavyhin 0:4cdaf9b1e7d0 41
PavelSavyhin 0:4cdaf9b1e7d0 42 class PacketId
PavelSavyhin 0:4cdaf9b1e7d0 43 {
PavelSavyhin 0:4cdaf9b1e7d0 44 public:
PavelSavyhin 0:4cdaf9b1e7d0 45 PacketId();
PavelSavyhin 0:4cdaf9b1e7d0 46
PavelSavyhin 0:4cdaf9b1e7d0 47 int getNext();
PavelSavyhin 0:4cdaf9b1e7d0 48
PavelSavyhin 0:4cdaf9b1e7d0 49 private:
PavelSavyhin 0:4cdaf9b1e7d0 50 static const int MAX_PACKET_ID = 65535;
PavelSavyhin 0:4cdaf9b1e7d0 51 int next;
PavelSavyhin 0:4cdaf9b1e7d0 52 };
PavelSavyhin 0:4cdaf9b1e7d0 53
PavelSavyhin 0:4cdaf9b1e7d0 54 typedef void (*messageHandler)(Message*);
PavelSavyhin 0:4cdaf9b1e7d0 55
PavelSavyhin 0:4cdaf9b1e7d0 56 typedef struct limits
PavelSavyhin 0:4cdaf9b1e7d0 57 {
PavelSavyhin 0:4cdaf9b1e7d0 58 int MAX_MQTT_PACKET_SIZE; //
PavelSavyhin 0:4cdaf9b1e7d0 59 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler
PavelSavyhin 0:4cdaf9b1e7d0 60 int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
PavelSavyhin 0:4cdaf9b1e7d0 61 int command_timeout_ms;
PavelSavyhin 0:4cdaf9b1e7d0 62
PavelSavyhin 0:4cdaf9b1e7d0 63 limits()
PavelSavyhin 0:4cdaf9b1e7d0 64 {
PavelSavyhin 0:4cdaf9b1e7d0 65 MAX_MQTT_PACKET_SIZE = 100;
PavelSavyhin 0:4cdaf9b1e7d0 66 MAX_MESSAGE_HANDLERS = 5;
PavelSavyhin 0:4cdaf9b1e7d0 67 MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
PavelSavyhin 0:4cdaf9b1e7d0 68 command_timeout_ms = 30000;
PavelSavyhin 0:4cdaf9b1e7d0 69 }
PavelSavyhin 0:4cdaf9b1e7d0 70 } Limits;
PavelSavyhin 0:4cdaf9b1e7d0 71
PavelSavyhin 0:4cdaf9b1e7d0 72
PavelSavyhin 0:4cdaf9b1e7d0 73 /**
PavelSavyhin 0:4cdaf9b1e7d0 74 * @class Async
PavelSavyhin 0:4cdaf9b1e7d0 75 * @brief non-blocking, threaded MQTT client API
PavelSavyhin 0:4cdaf9b1e7d0 76 * @param Network a network class which supports send, receive
PavelSavyhin 0:4cdaf9b1e7d0 77 * @param Timer a timer class with the methods:
PavelSavyhin 0:4cdaf9b1e7d0 78 */
PavelSavyhin 0:4cdaf9b1e7d0 79 template<class Network, class Timer, class Thread, class Mutex> class Async
PavelSavyhin 0:4cdaf9b1e7d0 80 {
PavelSavyhin 0:4cdaf9b1e7d0 81
PavelSavyhin 0:4cdaf9b1e7d0 82 public:
PavelSavyhin 0:4cdaf9b1e7d0 83
PavelSavyhin 0:4cdaf9b1e7d0 84 struct Result
PavelSavyhin 0:4cdaf9b1e7d0 85 {
PavelSavyhin 0:4cdaf9b1e7d0 86 /* success or failure result data */
PavelSavyhin 0:4cdaf9b1e7d0 87 Async<Network, Timer, Thread, Mutex>* client;
PavelSavyhin 0:4cdaf9b1e7d0 88 int rc;
PavelSavyhin 0:4cdaf9b1e7d0 89 };
PavelSavyhin 0:4cdaf9b1e7d0 90
PavelSavyhin 0:4cdaf9b1e7d0 91 typedef void (*resultHandler)(Result*);
PavelSavyhin 0:4cdaf9b1e7d0 92
PavelSavyhin 0:4cdaf9b1e7d0 93 Async(Network* network, const Limits limits = Limits());
PavelSavyhin 0:4cdaf9b1e7d0 94
PavelSavyhin 0:4cdaf9b1e7d0 95 typedef struct
PavelSavyhin 0:4cdaf9b1e7d0 96 {
PavelSavyhin 0:4cdaf9b1e7d0 97 Async* client;
PavelSavyhin 0:4cdaf9b1e7d0 98 Network* network;
PavelSavyhin 0:4cdaf9b1e7d0 99 } connectionLostInfo;
PavelSavyhin 0:4cdaf9b1e7d0 100
PavelSavyhin 0:4cdaf9b1e7d0 101 typedef int (*connectionLostHandlers)(connectionLostInfo*);
PavelSavyhin 0:4cdaf9b1e7d0 102
PavelSavyhin 0:4cdaf9b1e7d0 103 /** Set the connection lost callback - called whenever the connection is lost and we should be connected
PavelSavyhin 0:4cdaf9b1e7d0 104 * @param clh - pointer to the callback function
PavelSavyhin 0:4cdaf9b1e7d0 105 */
PavelSavyhin 0:4cdaf9b1e7d0 106 void setConnectionLostHandler(connectionLostHandlers clh)
PavelSavyhin 0:4cdaf9b1e7d0 107 {
PavelSavyhin 0:4cdaf9b1e7d0 108 connectionLostHandler.attach(clh);
PavelSavyhin 0:4cdaf9b1e7d0 109 }
PavelSavyhin 0:4cdaf9b1e7d0 110
PavelSavyhin 0:4cdaf9b1e7d0 111 /** Set the default message handling callback - used for any message which does not match a subscription message handler
PavelSavyhin 0:4cdaf9b1e7d0 112 * @param mh - pointer to the callback function
PavelSavyhin 0:4cdaf9b1e7d0 113 */
PavelSavyhin 0:4cdaf9b1e7d0 114 void setDefaultMessageHandler(messageHandler mh)
PavelSavyhin 0:4cdaf9b1e7d0 115 {
PavelSavyhin 0:4cdaf9b1e7d0 116 defaultMessageHandler.attach(mh);
PavelSavyhin 0:4cdaf9b1e7d0 117 }
PavelSavyhin 0:4cdaf9b1e7d0 118
PavelSavyhin 0:4cdaf9b1e7d0 119 int connect(resultHandler fn, MQTTPacket_connectData* options = 0);
PavelSavyhin 0:4cdaf9b1e7d0 120
PavelSavyhin 0:4cdaf9b1e7d0 121 template<class T>
PavelSavyhin 0:4cdaf9b1e7d0 122 int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function
PavelSavyhin 0:4cdaf9b1e7d0 123
PavelSavyhin 0:4cdaf9b1e7d0 124 int publish(resultHandler rh, const char* topic, Message* message);
PavelSavyhin 0:4cdaf9b1e7d0 125
PavelSavyhin 0:4cdaf9b1e7d0 126 int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh);
PavelSavyhin 0:4cdaf9b1e7d0 127
PavelSavyhin 0:4cdaf9b1e7d0 128 int unsubscribe(resultHandler rh, const char* topicFilter);
PavelSavyhin 0:4cdaf9b1e7d0 129
PavelSavyhin 0:4cdaf9b1e7d0 130 int disconnect(resultHandler rh);
PavelSavyhin 0:4cdaf9b1e7d0 131
PavelSavyhin 0:4cdaf9b1e7d0 132 private:
PavelSavyhin 0:4cdaf9b1e7d0 133
PavelSavyhin 0:4cdaf9b1e7d0 134 void run(void const *argument);
PavelSavyhin 0:4cdaf9b1e7d0 135 int cycle(int timeout);
PavelSavyhin 0:4cdaf9b1e7d0 136 int waitfor(int packet_type, Timer& atimer);
PavelSavyhin 0:4cdaf9b1e7d0 137 int keepalive();
PavelSavyhin 0:4cdaf9b1e7d0 138 int findFreeOperation();
PavelSavyhin 0:4cdaf9b1e7d0 139
PavelSavyhin 0:4cdaf9b1e7d0 140 int decodePacket(int* value, int timeout);
PavelSavyhin 0:4cdaf9b1e7d0 141 int readPacket(int timeout);
PavelSavyhin 0:4cdaf9b1e7d0 142 int sendPacket(int length, int timeout);
PavelSavyhin 0:4cdaf9b1e7d0 143 int deliverMessage(MQTTString* topic, Message* message);
PavelSavyhin 0:4cdaf9b1e7d0 144
PavelSavyhin 0:4cdaf9b1e7d0 145 Thread* thread;
PavelSavyhin 0:4cdaf9b1e7d0 146 Network* ipstack;
PavelSavyhin 0:4cdaf9b1e7d0 147
PavelSavyhin 0:4cdaf9b1e7d0 148 Limits limits;
PavelSavyhin 0:4cdaf9b1e7d0 149
PavelSavyhin 0:4cdaf9b1e7d0 150 char* buf;
PavelSavyhin 0:4cdaf9b1e7d0 151 char* readbuf;
PavelSavyhin 0:4cdaf9b1e7d0 152
PavelSavyhin 0:4cdaf9b1e7d0 153 Timer ping_timer, connect_timer;
PavelSavyhin 0:4cdaf9b1e7d0 154 unsigned int keepAliveInterval;
PavelSavyhin 0:4cdaf9b1e7d0 155 bool ping_outstanding;
PavelSavyhin 0:4cdaf9b1e7d0 156
PavelSavyhin 0:4cdaf9b1e7d0 157 PacketId packetid;
PavelSavyhin 0:4cdaf9b1e7d0 158
PavelSavyhin 0:4cdaf9b1e7d0 159 typedef FP<void, Result*> resultHandlerFP;
PavelSavyhin 0:4cdaf9b1e7d0 160 resultHandlerFP connectHandler;
PavelSavyhin 0:4cdaf9b1e7d0 161
PavelSavyhin 0:4cdaf9b1e7d0 162 typedef FP<void, Message*> messageHandlerFP;
PavelSavyhin 0:4cdaf9b1e7d0 163 struct MessageHandlers
PavelSavyhin 0:4cdaf9b1e7d0 164 {
PavelSavyhin 0:4cdaf9b1e7d0 165 const char* topic;
PavelSavyhin 0:4cdaf9b1e7d0 166 messageHandlerFP fp;
PavelSavyhin 0:4cdaf9b1e7d0 167 } *messageHandlers; // Message handlers are indexed by subscription topic
PavelSavyhin 0:4cdaf9b1e7d0 168
PavelSavyhin 0:4cdaf9b1e7d0 169 // how many concurrent operations should we allow? Each one will require a function pointer
PavelSavyhin 0:4cdaf9b1e7d0 170 struct Operations
PavelSavyhin 0:4cdaf9b1e7d0 171 {
PavelSavyhin 0:4cdaf9b1e7d0 172 unsigned short id;
PavelSavyhin 0:4cdaf9b1e7d0 173 resultHandlerFP fp;
PavelSavyhin 0:4cdaf9b1e7d0 174 const char* topic; // if this is a publish, store topic name in case republishing is required
PavelSavyhin 0:4cdaf9b1e7d0 175 Message* message; // for publish,
PavelSavyhin 0:4cdaf9b1e7d0 176 Timer timer; // to check if the command has timed out
PavelSavyhin 0:4cdaf9b1e7d0 177 } *operations; // result handlers are indexed by packet ids
PavelSavyhin 0:4cdaf9b1e7d0 178
PavelSavyhin 0:4cdaf9b1e7d0 179 static void threadfn(void* arg);
PavelSavyhin 0:4cdaf9b1e7d0 180
PavelSavyhin 0:4cdaf9b1e7d0 181 messageHandlerFP defaultMessageHandler;
PavelSavyhin 0:4cdaf9b1e7d0 182
PavelSavyhin 0:4cdaf9b1e7d0 183 typedef FP<int, connectionLostInfo*> connectionLostFP;
PavelSavyhin 0:4cdaf9b1e7d0 184
PavelSavyhin 0:4cdaf9b1e7d0 185 connectionLostFP connectionLostHandler;
PavelSavyhin 0:4cdaf9b1e7d0 186
PavelSavyhin 0:4cdaf9b1e7d0 187 };
PavelSavyhin 0:4cdaf9b1e7d0 188
PavelSavyhin 0:4cdaf9b1e7d0 189 }
PavelSavyhin 0:4cdaf9b1e7d0 190
PavelSavyhin 0:4cdaf9b1e7d0 191
PavelSavyhin 0:4cdaf9b1e7d0 192 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg)
PavelSavyhin 0:4cdaf9b1e7d0 193 {
PavelSavyhin 0:4cdaf9b1e7d0 194 ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
PavelSavyhin 0:4cdaf9b1e7d0 195 }
PavelSavyhin 0:4cdaf9b1e7d0 196
PavelSavyhin 0:4cdaf9b1e7d0 197
PavelSavyhin 0:4cdaf9b1e7d0 198 template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid()
PavelSavyhin 0:4cdaf9b1e7d0 199 {
PavelSavyhin 0:4cdaf9b1e7d0 200 this->thread = 0;
PavelSavyhin 0:4cdaf9b1e7d0 201 this->ipstack = network;
PavelSavyhin 0:4cdaf9b1e7d0 202 this->ping_timer = Timer();
PavelSavyhin 0:4cdaf9b1e7d0 203 this->ping_outstanding = 0;
PavelSavyhin 0:4cdaf9b1e7d0 204
PavelSavyhin 0:4cdaf9b1e7d0 205 // How to make these memory allocations portable? I was hoping to avoid the heap
PavelSavyhin 0:4cdaf9b1e7d0 206 buf = new char[limits.MAX_MQTT_PACKET_SIZE];
PavelSavyhin 0:4cdaf9b1e7d0 207 readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
PavelSavyhin 0:4cdaf9b1e7d0 208 this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS];
PavelSavyhin 0:4cdaf9b1e7d0 209 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 210 operations[i].id = 0;
PavelSavyhin 0:4cdaf9b1e7d0 211 this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
PavelSavyhin 0:4cdaf9b1e7d0 212 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 213 messageHandlers[i].topic = 0;
PavelSavyhin 0:4cdaf9b1e7d0 214 }
PavelSavyhin 0:4cdaf9b1e7d0 215
PavelSavyhin 0:4cdaf9b1e7d0 216
PavelSavyhin 0:4cdaf9b1e7d0 217 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
PavelSavyhin 0:4cdaf9b1e7d0 218 {
PavelSavyhin 0:4cdaf9b1e7d0 219 int sent = 0;
PavelSavyhin 0:4cdaf9b1e7d0 220
PavelSavyhin 0:4cdaf9b1e7d0 221 while (sent < length)
PavelSavyhin 0:4cdaf9b1e7d0 222 sent += ipstack->write(&buf[sent], length, timeout);
PavelSavyhin 0:4cdaf9b1e7d0 223 if (sent == length)
PavelSavyhin 0:4cdaf9b1e7d0 224 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
PavelSavyhin 0:4cdaf9b1e7d0 225 return sent;
PavelSavyhin 0:4cdaf9b1e7d0 226 }
PavelSavyhin 0:4cdaf9b1e7d0 227
PavelSavyhin 0:4cdaf9b1e7d0 228
PavelSavyhin 0:4cdaf9b1e7d0 229 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
PavelSavyhin 0:4cdaf9b1e7d0 230 {
PavelSavyhin 0:4cdaf9b1e7d0 231 char c;
PavelSavyhin 0:4cdaf9b1e7d0 232 int multiplier = 1;
PavelSavyhin 0:4cdaf9b1e7d0 233 int len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 234 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
PavelSavyhin 0:4cdaf9b1e7d0 235
PavelSavyhin 0:4cdaf9b1e7d0 236 *value = 0;
PavelSavyhin 0:4cdaf9b1e7d0 237 do
PavelSavyhin 0:4cdaf9b1e7d0 238 {
PavelSavyhin 0:4cdaf9b1e7d0 239 int rc = MQTTPACKET_READ_ERROR;
PavelSavyhin 0:4cdaf9b1e7d0 240
PavelSavyhin 0:4cdaf9b1e7d0 241 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
PavelSavyhin 0:4cdaf9b1e7d0 242 {
PavelSavyhin 0:4cdaf9b1e7d0 243 rc = MQTTPACKET_READ_ERROR; /* bad data */
PavelSavyhin 0:4cdaf9b1e7d0 244 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 245 }
PavelSavyhin 0:4cdaf9b1e7d0 246 rc = ipstack->read(&c, 1, timeout);
PavelSavyhin 0:4cdaf9b1e7d0 247 if (rc != 1)
PavelSavyhin 0:4cdaf9b1e7d0 248 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 249 *value += (c & 127) * multiplier;
PavelSavyhin 0:4cdaf9b1e7d0 250 multiplier *= 128;
PavelSavyhin 0:4cdaf9b1e7d0 251 } while ((c & 128) != 0);
PavelSavyhin 0:4cdaf9b1e7d0 252 exit:
PavelSavyhin 0:4cdaf9b1e7d0 253 return len;
PavelSavyhin 0:4cdaf9b1e7d0 254 }
PavelSavyhin 0:4cdaf9b1e7d0 255
PavelSavyhin 0:4cdaf9b1e7d0 256
PavelSavyhin 0:4cdaf9b1e7d0 257 /**
PavelSavyhin 0:4cdaf9b1e7d0 258 * If any read fails in this method, then we should disconnect from the network, as on reconnect
PavelSavyhin 0:4cdaf9b1e7d0 259 * the packets can be retried.
PavelSavyhin 0:4cdaf9b1e7d0 260 * @param timeout the max time to wait for the packet read to complete, in milliseconds
PavelSavyhin 0:4cdaf9b1e7d0 261 * @return the MQTT packet type, or -1 if none
PavelSavyhin 0:4cdaf9b1e7d0 262 */
PavelSavyhin 0:4cdaf9b1e7d0 263 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout)
PavelSavyhin 0:4cdaf9b1e7d0 264 {
PavelSavyhin 0:4cdaf9b1e7d0 265 int rc = -1;
PavelSavyhin 0:4cdaf9b1e7d0 266 MQTTHeader header = {0};
PavelSavyhin 0:4cdaf9b1e7d0 267 int len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 268 int rem_len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 269
PavelSavyhin 0:4cdaf9b1e7d0 270 /* 1. read the header byte. This has the packet type in it */
PavelSavyhin 0:4cdaf9b1e7d0 271 if (ipstack->read(readbuf, 1, timeout) != 1)
PavelSavyhin 0:4cdaf9b1e7d0 272 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 273
PavelSavyhin 0:4cdaf9b1e7d0 274 len = 1;
PavelSavyhin 0:4cdaf9b1e7d0 275 /* 2. read the remaining length. This is variable in itself */
PavelSavyhin 0:4cdaf9b1e7d0 276 decodePacket(&rem_len, timeout);
PavelSavyhin 0:4cdaf9b1e7d0 277 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
PavelSavyhin 0:4cdaf9b1e7d0 278
PavelSavyhin 0:4cdaf9b1e7d0 279 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
PavelSavyhin 0:4cdaf9b1e7d0 280 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
PavelSavyhin 0:4cdaf9b1e7d0 281 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 282
PavelSavyhin 0:4cdaf9b1e7d0 283 header.byte = readbuf[0];
PavelSavyhin 0:4cdaf9b1e7d0 284 rc = header.bits.type;
PavelSavyhin 0:4cdaf9b1e7d0 285 exit:
PavelSavyhin 0:4cdaf9b1e7d0 286 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 287 }
PavelSavyhin 0:4cdaf9b1e7d0 288
PavelSavyhin 0:4cdaf9b1e7d0 289
PavelSavyhin 0:4cdaf9b1e7d0 290 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
PavelSavyhin 0:4cdaf9b1e7d0 291 {
PavelSavyhin 0:4cdaf9b1e7d0 292 int rc = -1;
PavelSavyhin 0:4cdaf9b1e7d0 293
PavelSavyhin 0:4cdaf9b1e7d0 294 // we have to find the right message handler - indexed by topic
PavelSavyhin 0:4cdaf9b1e7d0 295 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 296 {
PavelSavyhin 0:4cdaf9b1e7d0 297 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
PavelSavyhin 0:4cdaf9b1e7d0 298 {
PavelSavyhin 0:4cdaf9b1e7d0 299 messageHandlers[i].fp(message);
PavelSavyhin 0:4cdaf9b1e7d0 300 rc = 0;
PavelSavyhin 0:4cdaf9b1e7d0 301 break;
PavelSavyhin 0:4cdaf9b1e7d0 302 }
PavelSavyhin 0:4cdaf9b1e7d0 303 }
PavelSavyhin 0:4cdaf9b1e7d0 304
PavelSavyhin 0:4cdaf9b1e7d0 305 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 306 }
PavelSavyhin 0:4cdaf9b1e7d0 307
PavelSavyhin 0:4cdaf9b1e7d0 308
PavelSavyhin 0:4cdaf9b1e7d0 309
PavelSavyhin 0:4cdaf9b1e7d0 310 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout)
PavelSavyhin 0:4cdaf9b1e7d0 311 {
PavelSavyhin 0:4cdaf9b1e7d0 312 /* get one piece of work off the wire and one pass through */
PavelSavyhin 0:4cdaf9b1e7d0 313
PavelSavyhin 0:4cdaf9b1e7d0 314 // read the socket, see what work is due
PavelSavyhin 0:4cdaf9b1e7d0 315 int packet_type = readPacket(timeout);
PavelSavyhin 0:4cdaf9b1e7d0 316
PavelSavyhin 0:4cdaf9b1e7d0 317 int len, rc;
PavelSavyhin 0:4cdaf9b1e7d0 318 switch (packet_type)
PavelSavyhin 0:4cdaf9b1e7d0 319 {
PavelSavyhin 0:4cdaf9b1e7d0 320 case CONNACK:
PavelSavyhin 0:4cdaf9b1e7d0 321 if (this->thread)
PavelSavyhin 0:4cdaf9b1e7d0 322 {
PavelSavyhin 0:4cdaf9b1e7d0 323 Result res = {this, 0};
PavelSavyhin 0:4cdaf9b1e7d0 324 if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 325 ;
PavelSavyhin 0:4cdaf9b1e7d0 326 connectHandler(&res);
PavelSavyhin 0:4cdaf9b1e7d0 327 connectHandler.detach(); // only invoke the callback once
PavelSavyhin 0:4cdaf9b1e7d0 328 }
PavelSavyhin 0:4cdaf9b1e7d0 329 break;
PavelSavyhin 0:4cdaf9b1e7d0 330 case PUBACK:
PavelSavyhin 0:4cdaf9b1e7d0 331 if (this->thread)
PavelSavyhin 0:4cdaf9b1e7d0 332 ; //call resultHandler
PavelSavyhin 0:4cdaf9b1e7d0 333 case SUBACK:
PavelSavyhin 0:4cdaf9b1e7d0 334 break;
PavelSavyhin 0:4cdaf9b1e7d0 335 case PUBLISH:
PavelSavyhin 0:4cdaf9b1e7d0 336 MQTTString topicName;
PavelSavyhin 0:4cdaf9b1e7d0 337 Message msg;
PavelSavyhin 0:4cdaf9b1e7d0 338 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
PavelSavyhin 0:4cdaf9b1e7d0 339 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);;
PavelSavyhin 0:4cdaf9b1e7d0 340 if (msg.qos == QOS0)
PavelSavyhin 0:4cdaf9b1e7d0 341 deliverMessage(&topicName, &msg);
PavelSavyhin 0:4cdaf9b1e7d0 342 break;
PavelSavyhin 0:4cdaf9b1e7d0 343 case PUBREC:
PavelSavyhin 0:4cdaf9b1e7d0 344 int type, dup, mypacketid;
PavelSavyhin 0:4cdaf9b1e7d0 345 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 346 ;
PavelSavyhin 0:4cdaf9b1e7d0 347 // must lock this access against the application thread, if we are multi-threaded
PavelSavyhin 0:4cdaf9b1e7d0 348 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
PavelSavyhin 0:4cdaf9b1e7d0 349 rc = sendPacket(len, timeout); // send the PUBREL packet
PavelSavyhin 0:4cdaf9b1e7d0 350 if (rc != len)
PavelSavyhin 0:4cdaf9b1e7d0 351 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 352
PavelSavyhin 0:4cdaf9b1e7d0 353 break;
PavelSavyhin 0:4cdaf9b1e7d0 354 case PUBCOMP:
PavelSavyhin 0:4cdaf9b1e7d0 355 break;
PavelSavyhin 0:4cdaf9b1e7d0 356 case PINGRESP:
PavelSavyhin 0:4cdaf9b1e7d0 357 ping_outstanding = false;
PavelSavyhin 0:4cdaf9b1e7d0 358 break;
PavelSavyhin 0:4cdaf9b1e7d0 359 }
PavelSavyhin 0:4cdaf9b1e7d0 360 keepalive();
PavelSavyhin 0:4cdaf9b1e7d0 361 exit:
PavelSavyhin 0:4cdaf9b1e7d0 362 return packet_type;
PavelSavyhin 0:4cdaf9b1e7d0 363 }
PavelSavyhin 0:4cdaf9b1e7d0 364
PavelSavyhin 0:4cdaf9b1e7d0 365
PavelSavyhin 0:4cdaf9b1e7d0 366 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive()
PavelSavyhin 0:4cdaf9b1e7d0 367 {
PavelSavyhin 0:4cdaf9b1e7d0 368 int rc = 0;
PavelSavyhin 0:4cdaf9b1e7d0 369
PavelSavyhin 0:4cdaf9b1e7d0 370 if (keepAliveInterval == 0)
PavelSavyhin 0:4cdaf9b1e7d0 371 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 372
PavelSavyhin 0:4cdaf9b1e7d0 373 if (ping_timer.expired())
PavelSavyhin 0:4cdaf9b1e7d0 374 {
PavelSavyhin 0:4cdaf9b1e7d0 375 if (ping_outstanding)
PavelSavyhin 0:4cdaf9b1e7d0 376 rc = -1;
PavelSavyhin 0:4cdaf9b1e7d0 377 else
PavelSavyhin 0:4cdaf9b1e7d0 378 {
PavelSavyhin 0:4cdaf9b1e7d0 379 int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
PavelSavyhin 0:4cdaf9b1e7d0 380 rc = sendPacket(len, 1000); // send the ping packet
PavelSavyhin 0:4cdaf9b1e7d0 381 if (rc != len)
PavelSavyhin 0:4cdaf9b1e7d0 382 rc = -1; // indicate there's a problem
PavelSavyhin 0:4cdaf9b1e7d0 383 else
PavelSavyhin 0:4cdaf9b1e7d0 384 ping_outstanding = true;
PavelSavyhin 0:4cdaf9b1e7d0 385 }
PavelSavyhin 0:4cdaf9b1e7d0 386 }
PavelSavyhin 0:4cdaf9b1e7d0 387
PavelSavyhin 0:4cdaf9b1e7d0 388 exit:
PavelSavyhin 0:4cdaf9b1e7d0 389 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 390 }
PavelSavyhin 0:4cdaf9b1e7d0 391
PavelSavyhin 0:4cdaf9b1e7d0 392
PavelSavyhin 0:4cdaf9b1e7d0 393 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument)
PavelSavyhin 0:4cdaf9b1e7d0 394 {
PavelSavyhin 0:4cdaf9b1e7d0 395 while (true)
PavelSavyhin 0:4cdaf9b1e7d0 396 cycle(ping_timer.left_ms());
PavelSavyhin 0:4cdaf9b1e7d0 397 }
PavelSavyhin 0:4cdaf9b1e7d0 398
PavelSavyhin 0:4cdaf9b1e7d0 399
PavelSavyhin 0:4cdaf9b1e7d0 400 // only used in single-threaded mode where one command at a time is in process
PavelSavyhin 0:4cdaf9b1e7d0 401 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
PavelSavyhin 0:4cdaf9b1e7d0 402 {
PavelSavyhin 0:4cdaf9b1e7d0 403 int rc = -1;
PavelSavyhin 0:4cdaf9b1e7d0 404
PavelSavyhin 0:4cdaf9b1e7d0 405 do
PavelSavyhin 0:4cdaf9b1e7d0 406 {
PavelSavyhin 0:4cdaf9b1e7d0 407 if (atimer.expired())
PavelSavyhin 0:4cdaf9b1e7d0 408 break; // we timed out
PavelSavyhin 0:4cdaf9b1e7d0 409 }
PavelSavyhin 0:4cdaf9b1e7d0 410 while ((rc = cycle(atimer.left_ms())) != packet_type);
PavelSavyhin 0:4cdaf9b1e7d0 411
PavelSavyhin 0:4cdaf9b1e7d0 412 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 413 }
PavelSavyhin 0:4cdaf9b1e7d0 414
PavelSavyhin 0:4cdaf9b1e7d0 415
PavelSavyhin 0:4cdaf9b1e7d0 416 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options)
PavelSavyhin 0:4cdaf9b1e7d0 417 {
PavelSavyhin 0:4cdaf9b1e7d0 418 connect_timer.countdown(limits.command_timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 419
PavelSavyhin 0:4cdaf9b1e7d0 420 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
PavelSavyhin 0:4cdaf9b1e7d0 421 if (options == 0)
PavelSavyhin 0:4cdaf9b1e7d0 422 options = &default_options; // set default options if none were supplied
PavelSavyhin 0:4cdaf9b1e7d0 423
PavelSavyhin 0:4cdaf9b1e7d0 424 this->keepAliveInterval = options->keepAliveInterval;
PavelSavyhin 0:4cdaf9b1e7d0 425 ping_timer.countdown(this->keepAliveInterval);
PavelSavyhin 0:4cdaf9b1e7d0 426 int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
PavelSavyhin 0:4cdaf9b1e7d0 427 int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet
PavelSavyhin 0:4cdaf9b1e7d0 428 if (rc != len)
PavelSavyhin 0:4cdaf9b1e7d0 429 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 430
PavelSavyhin 0:4cdaf9b1e7d0 431 if (resultHandler == 0) // wait until the connack is received
PavelSavyhin 0:4cdaf9b1e7d0 432 {
PavelSavyhin 0:4cdaf9b1e7d0 433 // this will be a blocking call, wait for the connack
PavelSavyhin 0:4cdaf9b1e7d0 434 if (waitfor(CONNACK, connect_timer) == CONNACK)
PavelSavyhin 0:4cdaf9b1e7d0 435 {
PavelSavyhin 0:4cdaf9b1e7d0 436 int connack_rc = -1;
PavelSavyhin 0:4cdaf9b1e7d0 437 if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 438 rc = connack_rc;
PavelSavyhin 0:4cdaf9b1e7d0 439 }
PavelSavyhin 0:4cdaf9b1e7d0 440 }
PavelSavyhin 0:4cdaf9b1e7d0 441 else
PavelSavyhin 0:4cdaf9b1e7d0 442 {
PavelSavyhin 0:4cdaf9b1e7d0 443 // set connect response callback function
PavelSavyhin 0:4cdaf9b1e7d0 444 connectHandler.attach(resultHandler);
PavelSavyhin 0:4cdaf9b1e7d0 445
PavelSavyhin 0:4cdaf9b1e7d0 446 // start background thread
PavelSavyhin 0:4cdaf9b1e7d0 447 this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
PavelSavyhin 0:4cdaf9b1e7d0 448 }
PavelSavyhin 0:4cdaf9b1e7d0 449
PavelSavyhin 0:4cdaf9b1e7d0 450 exit:
PavelSavyhin 0:4cdaf9b1e7d0 451 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 452 }
PavelSavyhin 0:4cdaf9b1e7d0 453
PavelSavyhin 0:4cdaf9b1e7d0 454
PavelSavyhin 0:4cdaf9b1e7d0 455 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation()
PavelSavyhin 0:4cdaf9b1e7d0 456 {
PavelSavyhin 0:4cdaf9b1e7d0 457 int found = -1;
PavelSavyhin 0:4cdaf9b1e7d0 458 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 459 {
PavelSavyhin 0:4cdaf9b1e7d0 460 if (operations[i].id == 0)
PavelSavyhin 0:4cdaf9b1e7d0 461 {
PavelSavyhin 0:4cdaf9b1e7d0 462 found = i;
PavelSavyhin 0:4cdaf9b1e7d0 463 break;
PavelSavyhin 0:4cdaf9b1e7d0 464 }
PavelSavyhin 0:4cdaf9b1e7d0 465 }
PavelSavyhin 0:4cdaf9b1e7d0 466 return found;
PavelSavyhin 0:4cdaf9b1e7d0 467 }
PavelSavyhin 0:4cdaf9b1e7d0 468
PavelSavyhin 0:4cdaf9b1e7d0 469
PavelSavyhin 0:4cdaf9b1e7d0 470 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
PavelSavyhin 0:4cdaf9b1e7d0 471 {
PavelSavyhin 0:4cdaf9b1e7d0 472 int index = 0;
PavelSavyhin 0:4cdaf9b1e7d0 473 if (this->thread)
PavelSavyhin 0:4cdaf9b1e7d0 474 index = findFreeOperation();
PavelSavyhin 0:4cdaf9b1e7d0 475 Timer& atimer = operations[index].timer;
PavelSavyhin 0:4cdaf9b1e7d0 476
PavelSavyhin 0:4cdaf9b1e7d0 477 atimer.countdown(limits.command_timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 478 MQTTString topic = {(char*)topicFilter, 0, 0};
PavelSavyhin 0:4cdaf9b1e7d0 479
PavelSavyhin 0:4cdaf9b1e7d0 480 int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
PavelSavyhin 0:4cdaf9b1e7d0 481 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
PavelSavyhin 0:4cdaf9b1e7d0 482 if (rc != len)
PavelSavyhin 0:4cdaf9b1e7d0 483 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 484
PavelSavyhin 0:4cdaf9b1e7d0 485 /* wait for suback */
PavelSavyhin 0:4cdaf9b1e7d0 486 if (resultHandler == 0)
PavelSavyhin 0:4cdaf9b1e7d0 487 {
PavelSavyhin 0:4cdaf9b1e7d0 488 // this will block
PavelSavyhin 0:4cdaf9b1e7d0 489 if (waitfor(SUBACK, atimer) == SUBACK)
PavelSavyhin 0:4cdaf9b1e7d0 490 {
PavelSavyhin 0:4cdaf9b1e7d0 491 int count = 0, grantedQoS = -1, mypacketid;
PavelSavyhin 0:4cdaf9b1e7d0 492 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 493 rc = grantedQoS; // 0, 1, 2 or 0x80
PavelSavyhin 0:4cdaf9b1e7d0 494 if (rc != 0x80)
PavelSavyhin 0:4cdaf9b1e7d0 495 {
PavelSavyhin 0:4cdaf9b1e7d0 496 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 497 {
PavelSavyhin 0:4cdaf9b1e7d0 498 if (messageHandlers[i].topic == 0)
PavelSavyhin 0:4cdaf9b1e7d0 499 {
PavelSavyhin 0:4cdaf9b1e7d0 500 messageHandlers[i].topic = topicFilter;
PavelSavyhin 0:4cdaf9b1e7d0 501 messageHandlers[i].fp.attach(messageHandler);
PavelSavyhin 0:4cdaf9b1e7d0 502 rc = 0;
PavelSavyhin 0:4cdaf9b1e7d0 503 break;
PavelSavyhin 0:4cdaf9b1e7d0 504 }
PavelSavyhin 0:4cdaf9b1e7d0 505 }
PavelSavyhin 0:4cdaf9b1e7d0 506 }
PavelSavyhin 0:4cdaf9b1e7d0 507 }
PavelSavyhin 0:4cdaf9b1e7d0 508 }
PavelSavyhin 0:4cdaf9b1e7d0 509 else
PavelSavyhin 0:4cdaf9b1e7d0 510 {
PavelSavyhin 0:4cdaf9b1e7d0 511 // set subscribe response callback function
PavelSavyhin 0:4cdaf9b1e7d0 512
PavelSavyhin 0:4cdaf9b1e7d0 513 }
PavelSavyhin 0:4cdaf9b1e7d0 514
PavelSavyhin 0:4cdaf9b1e7d0 515 exit:
PavelSavyhin 0:4cdaf9b1e7d0 516 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 517 }
PavelSavyhin 0:4cdaf9b1e7d0 518
PavelSavyhin 0:4cdaf9b1e7d0 519
PavelSavyhin 0:4cdaf9b1e7d0 520 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter)
PavelSavyhin 0:4cdaf9b1e7d0 521 {
PavelSavyhin 0:4cdaf9b1e7d0 522 int index = 0;
PavelSavyhin 0:4cdaf9b1e7d0 523 if (this->thread)
PavelSavyhin 0:4cdaf9b1e7d0 524 index = findFreeOperation();
PavelSavyhin 0:4cdaf9b1e7d0 525 Timer& atimer = operations[index].timer;
PavelSavyhin 0:4cdaf9b1e7d0 526
PavelSavyhin 0:4cdaf9b1e7d0 527 atimer.countdown(limits.command_timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 528 MQTTString topic = {(char*)topicFilter, 0, 0};
PavelSavyhin 0:4cdaf9b1e7d0 529
PavelSavyhin 0:4cdaf9b1e7d0 530 int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
PavelSavyhin 0:4cdaf9b1e7d0 531 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
PavelSavyhin 0:4cdaf9b1e7d0 532 if (rc != len)
PavelSavyhin 0:4cdaf9b1e7d0 533 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 534
PavelSavyhin 0:4cdaf9b1e7d0 535 // set unsubscribe response callback function
PavelSavyhin 0:4cdaf9b1e7d0 536
PavelSavyhin 0:4cdaf9b1e7d0 537
PavelSavyhin 0:4cdaf9b1e7d0 538 exit:
PavelSavyhin 0:4cdaf9b1e7d0 539 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 540 }
PavelSavyhin 0:4cdaf9b1e7d0 541
PavelSavyhin 0:4cdaf9b1e7d0 542
PavelSavyhin 0:4cdaf9b1e7d0 543
PavelSavyhin 0:4cdaf9b1e7d0 544 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message)
PavelSavyhin 0:4cdaf9b1e7d0 545 {
PavelSavyhin 0:4cdaf9b1e7d0 546 int index = 0;
PavelSavyhin 0:4cdaf9b1e7d0 547 if (this->thread)
PavelSavyhin 0:4cdaf9b1e7d0 548 index = findFreeOperation();
PavelSavyhin 0:4cdaf9b1e7d0 549 Timer& atimer = operations[index].timer;
PavelSavyhin 0:4cdaf9b1e7d0 550
PavelSavyhin 0:4cdaf9b1e7d0 551 atimer.countdown(limits.command_timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 552 MQTTString topic = {(char*)topicName, 0, 0};
PavelSavyhin 0:4cdaf9b1e7d0 553
PavelSavyhin 0:4cdaf9b1e7d0 554 if (message->qos == QOS1 || message->qos == QOS2)
PavelSavyhin 0:4cdaf9b1e7d0 555 message->id = packetid.getNext();
PavelSavyhin 0:4cdaf9b1e7d0 556
PavelSavyhin 0:4cdaf9b1e7d0 557 int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen);
PavelSavyhin 0:4cdaf9b1e7d0 558 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
PavelSavyhin 0:4cdaf9b1e7d0 559 if (rc != len)
PavelSavyhin 0:4cdaf9b1e7d0 560 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 561
PavelSavyhin 0:4cdaf9b1e7d0 562 /* wait for acks */
PavelSavyhin 0:4cdaf9b1e7d0 563 if (resultHandler == 0)
PavelSavyhin 0:4cdaf9b1e7d0 564 {
PavelSavyhin 0:4cdaf9b1e7d0 565 if (message->qos == QOS1)
PavelSavyhin 0:4cdaf9b1e7d0 566 {
PavelSavyhin 0:4cdaf9b1e7d0 567 if (waitfor(PUBACK, atimer) == PUBACK)
PavelSavyhin 0:4cdaf9b1e7d0 568 {
PavelSavyhin 0:4cdaf9b1e7d0 569 int type, dup, mypacketid;
PavelSavyhin 0:4cdaf9b1e7d0 570 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 571 rc = 0;
PavelSavyhin 0:4cdaf9b1e7d0 572 }
PavelSavyhin 0:4cdaf9b1e7d0 573 }
PavelSavyhin 0:4cdaf9b1e7d0 574 else if (message->qos == QOS2)
PavelSavyhin 0:4cdaf9b1e7d0 575 {
PavelSavyhin 0:4cdaf9b1e7d0 576 if (waitfor(PUBCOMP, atimer) == PUBCOMP)
PavelSavyhin 0:4cdaf9b1e7d0 577 {
PavelSavyhin 0:4cdaf9b1e7d0 578 int type, dup, mypacketid;
PavelSavyhin 0:4cdaf9b1e7d0 579 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 580 rc = 0;
PavelSavyhin 0:4cdaf9b1e7d0 581 }
PavelSavyhin 0:4cdaf9b1e7d0 582
PavelSavyhin 0:4cdaf9b1e7d0 583 }
PavelSavyhin 0:4cdaf9b1e7d0 584 }
PavelSavyhin 0:4cdaf9b1e7d0 585 else
PavelSavyhin 0:4cdaf9b1e7d0 586 {
PavelSavyhin 0:4cdaf9b1e7d0 587 // set publish response callback function
PavelSavyhin 0:4cdaf9b1e7d0 588
PavelSavyhin 0:4cdaf9b1e7d0 589 }
PavelSavyhin 0:4cdaf9b1e7d0 590
PavelSavyhin 0:4cdaf9b1e7d0 591 exit:
PavelSavyhin 0:4cdaf9b1e7d0 592 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 593 }
PavelSavyhin 0:4cdaf9b1e7d0 594
PavelSavyhin 0:4cdaf9b1e7d0 595
PavelSavyhin 0:4cdaf9b1e7d0 596 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler)
PavelSavyhin 0:4cdaf9b1e7d0 597 {
PavelSavyhin 0:4cdaf9b1e7d0 598 Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete
PavelSavyhin 0:4cdaf9b1e7d0 599 int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
PavelSavyhin 0:4cdaf9b1e7d0 600 int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet
PavelSavyhin 0:4cdaf9b1e7d0 601
PavelSavyhin 0:4cdaf9b1e7d0 602 return (rc == len) ? 0 : -1;
PavelSavyhin 0:4cdaf9b1e7d0 603 }
PavelSavyhin 0:4cdaf9b1e7d0 604
PavelSavyhin 0:4cdaf9b1e7d0 605
PavelSavyhin 0:4cdaf9b1e7d0 606
PavelSavyhin 0:4cdaf9b1e7d0 607 #endif