Example of AWS IoT connection and Web Dashboard thru STM32 Nucleo evaluation board and mbed OS.

Dependencies:   X_NUCLEO_IKS01A1 mbed FP MQTTPacket DnsQuery ATParser

Introduction

The demo is aimed to STM32 Nucleo board with WiFi and sensors expansions. The board is a "thing" for the AWS IoT service. It updates IoT service shadow with sensors data every second and checks subscription messages.

Hardware Configuration

https://github.com/Klika-Tech/nucleo-aws-iot-demo/raw/master/doc/assets/device.jpg

Software Configuration

  • Import this Project to mbed online compiler
  • Find the next part of code in main.cpp file ...

WiFi network credential

#include "mbed.h"
// WiFi network credential
#define SSID   ""  // Network must be visible otherwise it can't connect
#define PASSW  ""
#error "Wifi SSID & password empty"
  • ... And set it to your Network Name and Password. Do not forget to remove "#error" pragma line.

Information

Nucleo WiFi module is not the same as your smartphone or laptope - it is based on demo board. To avoid connection problems:

  1. Place Nucleo as close to WiFi hot spot as possible. Or...
  2. Turn on mobile hot spot in your laptop as close to the device as possible.
  3. Make sure that hot spot permits 2.4 GHz band communications
  • Setup BackEnd and store certificates using this backend setup instruction
  • Find AWS_IOT_MQTT_HOST define and change it to HTTPS point mentioned in your AWS IoT thing properties named "interact"

#define AWS_IOT_MQTT_HOST              "xxxxxxxxxx.iot.us-east-1.amazonaws.com" //Use your own host.
  • Find the certificate defines clientCRT and clientKey in main.cpp file and change it to ones provided by Amazon.

/**********************************************************************************************
***********************************************************************************************
				Device Identity Certificates: Modify for your AWS IoT Thing
***********************************************************************************************
***********************************************************************************************/

/****************************************
(somecode)-certificate.pem.crt - Amazon signed PEM sertificate.
*****************************************/

//This Client cert is example. Use own instead.
const uint8_t clientCRT[] = "\
-----BEGIN CERTIFICATE-----\n\
MIIDBjCCAe6gAwIBAgIUVph856omeIxW3UPioq+UrX1DbwowDQYJKoZIhvcNAQEL\
BQAwTTFLMEkGA1UECwxCQW1hem9uIFdlYiBTZXJ2aWNlcyBPPUFtYXpvbi5jb20g\
SW5jLiBMPVNlYXR0bGUgU1Q9V2FzaGluZ3RvbiBDPVVTMB4XDTE3MDUyNTExNTEy\
OVoXDTQ5MTIzMTIzNTk1OVowgZUxCzAJBgNVBAYTAkJZMQ4wDAYDVQQIDAVNaW5z\
azEOMAwGA1UEBwwFTWluc2sxFzAVBgNVBAoMDktsaWthLVRlY2ggTExDMRcwFQYD\
VQQLDA5LbGlrYS1UZWNoIExMQzEMMAoGA1UEAwwDUm5EMSYwJAYJKoZIhvcNAQkB\
FhdtdmF0YWxldUBrbGlrYS10ZWNoLmNvbTBZMBMGByqGSM49AgEGCCqGSM49AwEH\
A0IABCJgOQJmoTBJVPfli9Hm/JVixaxkY5rtlgrYO3hSl633A2hg0P/ue0wXDbF3\
aQ0X57IRFE4k4FEbr3UXjT/IczKjYDBeMB8GA1UdIwQYMBaAFK3YzTUPlYB2Li75\
i/z8rEogr1d6MB0GA1UdDgQWBBT18HXBaXFJuAR/0SwegnxJ+pyJ6TAMBgNVHRMB\
Af8EAjAAMA4GA1UdDwEB/wQEAwIHgDANBgkqhkiG9w0BAQsFAAOCAQEAb0Ux1aH5\
RLxjrfGqXN6rPVqh8QQRS+AyBfzmaQN8HaPZMkX5WxXLvcn0A3uWlwQxPPkcZ4zf\
51GHtFFQWB4YZ8dx8mUQ0v/j7onHjCJgZ8iDgwOyKMGtnsDZWCakQw+a6cj+NrMZ\
tzhjwCzEEP6ePcbXwErI5OOzLuWns2L/JEr2wWNkokgRuS8ewr/SQ9OLWIWa2rFM\
ahPNTb3y/qBeWdjeJmhI+TOxdqIpsF8roWP25zwo/zkzCHCjXFBrL+0CA4MpxIl9\
x02i7aAhlJ6ys80lDxdeWeeQJXRKkGknP8mcmKn3iEqqJ5s1dQePj2b5d3ldatya\
wsxQBqqZXzIWEw==\
\n\
-----END CERTIFICATE-----\n";



/**********************************************************************************************
***********************************************************************************************
						Private Key: Modify for your AWS IoT Thing
***********************************************************************************************
***********************************************************************************************/

/********************************************************************8****************************************
nucleo.key.pem - client key generated according to readme.
**************************************************************************************************************/

//This Client Key is example. Use own instead.
const uint8_t clientKey[] ="\
-----BEGIN EC PARAMETERS-----\n\
BggqhkjOPQMBBw==\
-----END EC PARAMETERS-----\n\
-----BEGIN EC PRIVATE KEY-----\n\
MHcCAQEEIHPRfWSC8/k/BsqDWKuP15dXsI9fGwpkTIsLZe6mIrAAoAoGCCqGSM49\
AwEHoUQDQgAEImA5AmahMElU9+WL0eb8lWLFrGRjmu2WCtg7eFKXrfcDaGDQ/+57\
TBcNsXdpDRfnshEUTiTgURuvdReNP8hzMg==\
-----END EC PRIVATE KEY-----\n";

Build and Check

  1. Plugin your board to USB of your PC. USB Disk Drive and USB COM Port should appear in your system.
  2. Open any Serial Console, connect it to your USB Serial Port and setup speed equal to 115200.
  3. Compile this Project and save .bin file to USB Disk Drive
  4. After board reset you should see next log in serial console:

X-NUCLEO-IDW01M1 mbed Application

connecting to AP
LOG:   int main() L#361 Connected to WiFI.
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#186 =====================================
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#187 Connecting WiFi.
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#188 Nucleo IP ADDRESS: X.X.X.X
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#189 Nucleo MAC ADDRESS: 00:11:22:33:44:55
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#190 Server Hostname: xxxxxxxx.iot.us-east-1.amazonaws.com port: 8883
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#191 Client ID: Nucleo
LOG:   int connect(MQTT::Client<MQTTWiFi, Countdown, 350, 5> *, MQTTWiFi *) L#194 =====================================
LOG:   int MQTTSocket::getNTPtime(int) L#58 Success receiving time from ntp server. Tick from 1 Jan 1970 is equal to 1505399292.
--->TCP Connected
--->MQTT Connected
--->>>MQTT subscribed to: Nucleo/test
Length - 245, Publishing {"state": {"reported": {"temperature": 23.690001, "humidity": 98.190002, "pressure": 982.869141, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.420000, -2.660000, 1.750000], "magnetometer": [-3.600000, -7.100000, 53.300000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.660000, "humidity": 98.010002, "pressure": 982.770264, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.770000, -2.310000, 1.470000], "magnetometer": [-3.100000, -8.300000, 54.200000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.670000, "humidity": 98.129997, "pressure": 982.724121, "accelerometer": [-0.008000, 0.029000, 0.971000], "gyroscope": [0.630000, -2.380000, 1.400000], "magnetometer": [-3.100000, -7.900000, 53.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.690001, "humidity": 98.019997, "pressure": 982.840088, "accelerometer": [-0.009000, 0.030000, 0.972000], "gyroscope": [0.700000, -2.450000, 1.540000], "magnetometer": [-3.700000, -7.900000, 53.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.709999, "humidity": 98.040001, "pressure": 982.828613, "accelerometer": [-0.009000, 0.030000, 0.971000], "gyroscope": [0.630000, -2.520000, 1.470000], "magnetometer": [-2.900000, -7.400000, 52.400000]}}}
Length - 245, Publishing {"state": {"reported": {"temperature": 23.719999, "humidity": 97.860001, "pressure": 982.917236, "accelerometer": [-0.026000, 0.103000, 0.891000], "gyroscope": [1.050000, -2.310000, 1.260000], "magnetometer": [-3.300000, -7.100000, 53.500000]}}}

Information

Device connection state might be checked by Green Led on the board. Green light means that device is connected and transferring data to cloud.

  1. Configure and start your dashboard using instruction and corresponding sources from github
  2. Use Blue button to set up markers to charts.
  3. Use AWS IoT console MQTT Client to test device subscription to "Nucleo/test". Just publish any message to this topic and serial port output.
  4. PROFIT!
Committer:
PavelSavyhin
Date:
Thu Oct 19 11:36:41 2017 +0000
Revision:
1:042ca9148926
Parent:
0:4cdaf9b1e7d0
Connection times are optimized and logging is extended.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
PavelSavyhin 0:4cdaf9b1e7d0 1 /*******************************************************************************
PavelSavyhin 0:4cdaf9b1e7d0 2 * Copyright (c) 2014, 2015 IBM Corp.
PavelSavyhin 0:4cdaf9b1e7d0 3 *
PavelSavyhin 0:4cdaf9b1e7d0 4 * All rights reserved. This program and the accompanying materials
PavelSavyhin 0:4cdaf9b1e7d0 5 * are made available under the terms of the Eclipse Public License v1.0
PavelSavyhin 0:4cdaf9b1e7d0 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
PavelSavyhin 0:4cdaf9b1e7d0 7 *
PavelSavyhin 0:4cdaf9b1e7d0 8 * The Eclipse Public License is available at
PavelSavyhin 0:4cdaf9b1e7d0 9 * http://www.eclipse.org/legal/epl-v10.html
PavelSavyhin 0:4cdaf9b1e7d0 10 * and the Eclipse Distribution License is available at
PavelSavyhin 0:4cdaf9b1e7d0 11 * http://www.eclipse.org/org/documents/edl-v10.php.
PavelSavyhin 0:4cdaf9b1e7d0 12 *
PavelSavyhin 0:4cdaf9b1e7d0 13 * Contributors: *
PavelSavyhin 0:4cdaf9b1e7d0 14 * Ian Craggs - initial API and implementation and/or initial documentation
PavelSavyhin 0:4cdaf9b1e7d0 15 * Ian Craggs - fix for bug 458512 - QoS 2 messages
PavelSavyhin 0:4cdaf9b1e7d0 16 * Ian Craggs - fix for bug 460389 - send loop uses wrong length
PavelSavyhin 0:4cdaf9b1e7d0 17 * Ian Craggs - fix for bug 464169 - clearing subscriptions
PavelSavyhin 0:4cdaf9b1e7d0 18 * Ian Craggs - fix for bug 464551 - enums and ints can be different size
PavelSavyhin 0:4cdaf9b1e7d0 19 *******************************************************************************/
PavelSavyhin 0:4cdaf9b1e7d0 20
PavelSavyhin 0:4cdaf9b1e7d0 21 #if !defined(MQTTCLIENT_H)
PavelSavyhin 0:4cdaf9b1e7d0 22 #define MQTTCLIENT_H
PavelSavyhin 0:4cdaf9b1e7d0 23
PavelSavyhin 0:4cdaf9b1e7d0 24 #include "FP.h"
PavelSavyhin 0:4cdaf9b1e7d0 25 #include "MQTTPacket.h"
PavelSavyhin 0:4cdaf9b1e7d0 26 #include "stdio.h"
PavelSavyhin 0:4cdaf9b1e7d0 27 #include "MQTTLogging.h"
PavelSavyhin 0:4cdaf9b1e7d0 28
PavelSavyhin 0:4cdaf9b1e7d0 29 #if !defined(MQTTCLIENT_QOS1)
PavelSavyhin 0:4cdaf9b1e7d0 30 #define MQTTCLIENT_QOS1 1
PavelSavyhin 0:4cdaf9b1e7d0 31 #endif
PavelSavyhin 0:4cdaf9b1e7d0 32 #if !defined(MQTTCLIENT_QOS2)
PavelSavyhin 0:4cdaf9b1e7d0 33 #define MQTTCLIENT_QOS2 0
PavelSavyhin 0:4cdaf9b1e7d0 34 #endif
PavelSavyhin 0:4cdaf9b1e7d0 35
PavelSavyhin 0:4cdaf9b1e7d0 36 namespace MQTT
PavelSavyhin 0:4cdaf9b1e7d0 37 {
PavelSavyhin 0:4cdaf9b1e7d0 38
PavelSavyhin 0:4cdaf9b1e7d0 39
PavelSavyhin 0:4cdaf9b1e7d0 40 enum QoS { QOS0, QOS1, QOS2 };
PavelSavyhin 0:4cdaf9b1e7d0 41
PavelSavyhin 0:4cdaf9b1e7d0 42 // all failure return codes must be negative
PavelSavyhin 0:4cdaf9b1e7d0 43 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
PavelSavyhin 0:4cdaf9b1e7d0 44
PavelSavyhin 0:4cdaf9b1e7d0 45
PavelSavyhin 0:4cdaf9b1e7d0 46 struct Message
PavelSavyhin 0:4cdaf9b1e7d0 47 {
PavelSavyhin 0:4cdaf9b1e7d0 48 enum QoS qos;
PavelSavyhin 0:4cdaf9b1e7d0 49 bool retained;
PavelSavyhin 0:4cdaf9b1e7d0 50 bool dup;
PavelSavyhin 0:4cdaf9b1e7d0 51 unsigned short id;
PavelSavyhin 0:4cdaf9b1e7d0 52 void *payload;
PavelSavyhin 0:4cdaf9b1e7d0 53 size_t payloadlen;
PavelSavyhin 0:4cdaf9b1e7d0 54 };
PavelSavyhin 0:4cdaf9b1e7d0 55
PavelSavyhin 0:4cdaf9b1e7d0 56
PavelSavyhin 0:4cdaf9b1e7d0 57 struct MessageData
PavelSavyhin 0:4cdaf9b1e7d0 58 {
PavelSavyhin 0:4cdaf9b1e7d0 59 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
PavelSavyhin 0:4cdaf9b1e7d0 60 { }
PavelSavyhin 0:4cdaf9b1e7d0 61
PavelSavyhin 0:4cdaf9b1e7d0 62 struct Message &message;
PavelSavyhin 0:4cdaf9b1e7d0 63 MQTTString &topicName;
PavelSavyhin 0:4cdaf9b1e7d0 64 };
PavelSavyhin 0:4cdaf9b1e7d0 65
PavelSavyhin 0:4cdaf9b1e7d0 66
PavelSavyhin 0:4cdaf9b1e7d0 67 class PacketId
PavelSavyhin 0:4cdaf9b1e7d0 68 {
PavelSavyhin 0:4cdaf9b1e7d0 69 public:
PavelSavyhin 0:4cdaf9b1e7d0 70 PacketId()
PavelSavyhin 0:4cdaf9b1e7d0 71 {
PavelSavyhin 0:4cdaf9b1e7d0 72 next = 0;
PavelSavyhin 0:4cdaf9b1e7d0 73 }
PavelSavyhin 0:4cdaf9b1e7d0 74
PavelSavyhin 0:4cdaf9b1e7d0 75 int getNext()
PavelSavyhin 0:4cdaf9b1e7d0 76 {
PavelSavyhin 0:4cdaf9b1e7d0 77 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
PavelSavyhin 0:4cdaf9b1e7d0 78 }
PavelSavyhin 0:4cdaf9b1e7d0 79
PavelSavyhin 0:4cdaf9b1e7d0 80 private:
PavelSavyhin 0:4cdaf9b1e7d0 81 static const int MAX_PACKET_ID = 65535;
PavelSavyhin 0:4cdaf9b1e7d0 82 int next;
PavelSavyhin 0:4cdaf9b1e7d0 83 };
PavelSavyhin 0:4cdaf9b1e7d0 84
PavelSavyhin 0:4cdaf9b1e7d0 85
PavelSavyhin 0:4cdaf9b1e7d0 86 /**
PavelSavyhin 0:4cdaf9b1e7d0 87 * @class Client
PavelSavyhin 0:4cdaf9b1e7d0 88 * @brief blocking, non-threaded MQTT client API
PavelSavyhin 0:4cdaf9b1e7d0 89 *
PavelSavyhin 0:4cdaf9b1e7d0 90 * This version of the API blocks on all method calls, until they are complete. This means that only one
PavelSavyhin 0:4cdaf9b1e7d0 91 * MQTT request can be in process at any one time.
PavelSavyhin 0:4cdaf9b1e7d0 92 * @param Network a network class which supports send, receive
PavelSavyhin 0:4cdaf9b1e7d0 93 * @param Timer a timer class with the methods:
PavelSavyhin 0:4cdaf9b1e7d0 94 */
PavelSavyhin 0:4cdaf9b1e7d0 95 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
PavelSavyhin 0:4cdaf9b1e7d0 96 class Client
PavelSavyhin 0:4cdaf9b1e7d0 97 {
PavelSavyhin 0:4cdaf9b1e7d0 98
PavelSavyhin 0:4cdaf9b1e7d0 99 public:
PavelSavyhin 0:4cdaf9b1e7d0 100
PavelSavyhin 0:4cdaf9b1e7d0 101 typedef void (*messageHandler)(MessageData&);
PavelSavyhin 0:4cdaf9b1e7d0 102
PavelSavyhin 0:4cdaf9b1e7d0 103 /** Construct the client
PavelSavyhin 0:4cdaf9b1e7d0 104 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
PavelSavyhin 0:4cdaf9b1e7d0 105 * before calling MQTT connect
PavelSavyhin 0:4cdaf9b1e7d0 106 * @param limits an instance of the Limit class - to alter limits as required
PavelSavyhin 0:4cdaf9b1e7d0 107 */
PavelSavyhin 0:4cdaf9b1e7d0 108 Client(Network& network, unsigned int command_timeout_ms = 30000);
PavelSavyhin 0:4cdaf9b1e7d0 109
PavelSavyhin 0:4cdaf9b1e7d0 110 /** Set the default message handling callback - used for any message which does not match a subscription message handler
PavelSavyhin 0:4cdaf9b1e7d0 111 * @param mh - pointer to the callback function
PavelSavyhin 0:4cdaf9b1e7d0 112 */
PavelSavyhin 0:4cdaf9b1e7d0 113 void setDefaultMessageHandler(messageHandler mh)
PavelSavyhin 0:4cdaf9b1e7d0 114 {
PavelSavyhin 0:4cdaf9b1e7d0 115 defaultMessageHandler.attach(mh);
PavelSavyhin 0:4cdaf9b1e7d0 116 }
PavelSavyhin 0:4cdaf9b1e7d0 117
PavelSavyhin 0:4cdaf9b1e7d0 118 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
PavelSavyhin 0:4cdaf9b1e7d0 119 * The nework object must be connected to the network endpoint before calling this
PavelSavyhin 0:4cdaf9b1e7d0 120 * Default connect options are used
PavelSavyhin 0:4cdaf9b1e7d0 121 * @return success code -
PavelSavyhin 0:4cdaf9b1e7d0 122 */
PavelSavyhin 0:4cdaf9b1e7d0 123 int connect();
PavelSavyhin 0:4cdaf9b1e7d0 124
PavelSavyhin 0:4cdaf9b1e7d0 125 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
PavelSavyhin 0:4cdaf9b1e7d0 126 * The nework object must be connected to the network endpoint before calling this
PavelSavyhin 0:4cdaf9b1e7d0 127 * @param options - connect options
PavelSavyhin 0:4cdaf9b1e7d0 128 * @return success code -
PavelSavyhin 0:4cdaf9b1e7d0 129 */
PavelSavyhin 0:4cdaf9b1e7d0 130 int connect(MQTTPacket_connectData& options);
PavelSavyhin 0:4cdaf9b1e7d0 131
PavelSavyhin 0:4cdaf9b1e7d0 132 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
PavelSavyhin 0:4cdaf9b1e7d0 133 * @param topic - the topic to publish to
PavelSavyhin 0:4cdaf9b1e7d0 134 * @param message - the message to send
PavelSavyhin 0:4cdaf9b1e7d0 135 * @return success code -
PavelSavyhin 0:4cdaf9b1e7d0 136 */
PavelSavyhin 0:4cdaf9b1e7d0 137 int publish(const char* topicName, Message& message);
PavelSavyhin 0:4cdaf9b1e7d0 138
PavelSavyhin 0:4cdaf9b1e7d0 139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
PavelSavyhin 0:4cdaf9b1e7d0 140 * @param topic - the topic to publish to
PavelSavyhin 0:4cdaf9b1e7d0 141 * @param payload - the data to send
PavelSavyhin 0:4cdaf9b1e7d0 142 * @param payloadlen - the length of the data
PavelSavyhin 0:4cdaf9b1e7d0 143 * @param qos - the QoS to send the publish at
PavelSavyhin 0:4cdaf9b1e7d0 144 * @param retained - whether the message should be retained
PavelSavyhin 0:4cdaf9b1e7d0 145 * @return success code -
PavelSavyhin 0:4cdaf9b1e7d0 146 */
PavelSavyhin 0:4cdaf9b1e7d0 147 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
PavelSavyhin 0:4cdaf9b1e7d0 148
PavelSavyhin 0:4cdaf9b1e7d0 149 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
PavelSavyhin 0:4cdaf9b1e7d0 150 * @param topic - the topic to publish to
PavelSavyhin 0:4cdaf9b1e7d0 151 * @param payload - the data to send
PavelSavyhin 0:4cdaf9b1e7d0 152 * @param payloadlen - the length of the data
PavelSavyhin 0:4cdaf9b1e7d0 153 * @param id - the packet id used - returned
PavelSavyhin 0:4cdaf9b1e7d0 154 * @param qos - the QoS to send the publish at
PavelSavyhin 0:4cdaf9b1e7d0 155 * @param retained - whether the message should be retained
PavelSavyhin 0:4cdaf9b1e7d0 156 * @return success code -
PavelSavyhin 0:4cdaf9b1e7d0 157 */
PavelSavyhin 0:4cdaf9b1e7d0 158 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
PavelSavyhin 0:4cdaf9b1e7d0 159
PavelSavyhin 0:4cdaf9b1e7d0 160 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
PavelSavyhin 0:4cdaf9b1e7d0 161 * @param topicFilter - a topic pattern which can include wildcards
PavelSavyhin 0:4cdaf9b1e7d0 162 * @param qos - the MQTT QoS to subscribe at
PavelSavyhin 0:4cdaf9b1e7d0 163 * @param mh - the callback function to be invoked when a message is received for this subscription
PavelSavyhin 0:4cdaf9b1e7d0 164 * @return success code -
PavelSavyhin 0:4cdaf9b1e7d0 165 */
PavelSavyhin 0:4cdaf9b1e7d0 166 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
PavelSavyhin 0:4cdaf9b1e7d0 167
PavelSavyhin 0:4cdaf9b1e7d0 168 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
PavelSavyhin 0:4cdaf9b1e7d0 169 * @param topicFilter - a topic pattern which can include wildcards
PavelSavyhin 0:4cdaf9b1e7d0 170 * @return success code -
PavelSavyhin 0:4cdaf9b1e7d0 171 */
PavelSavyhin 0:4cdaf9b1e7d0 172 int unsubscribe(const char* topicFilter);
PavelSavyhin 0:4cdaf9b1e7d0 173
PavelSavyhin 0:4cdaf9b1e7d0 174 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
PavelSavyhin 0:4cdaf9b1e7d0 175 * @return success code -
PavelSavyhin 0:4cdaf9b1e7d0 176 */
PavelSavyhin 0:4cdaf9b1e7d0 177 int disconnect();
PavelSavyhin 0:4cdaf9b1e7d0 178
PavelSavyhin 0:4cdaf9b1e7d0 179 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
PavelSavyhin 0:4cdaf9b1e7d0 180 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
PavelSavyhin 0:4cdaf9b1e7d0 181 * received.
PavelSavyhin 0:4cdaf9b1e7d0 182 * @param timeout_ms the time to wait, in milliseconds
PavelSavyhin 0:4cdaf9b1e7d0 183 * @return success code - on failure, this means the client has disconnected
PavelSavyhin 0:4cdaf9b1e7d0 184 */
PavelSavyhin 0:4cdaf9b1e7d0 185 int yield(unsigned long timeout_ms = 1000L);
PavelSavyhin 0:4cdaf9b1e7d0 186
PavelSavyhin 0:4cdaf9b1e7d0 187 /** Is the client connected?
PavelSavyhin 0:4cdaf9b1e7d0 188 * @return flag - is the client connected or not?
PavelSavyhin 0:4cdaf9b1e7d0 189 */
PavelSavyhin 0:4cdaf9b1e7d0 190 bool isConnected()
PavelSavyhin 0:4cdaf9b1e7d0 191 {
PavelSavyhin 0:4cdaf9b1e7d0 192 return isconnected;
PavelSavyhin 0:4cdaf9b1e7d0 193 }
PavelSavyhin 0:4cdaf9b1e7d0 194
PavelSavyhin 0:4cdaf9b1e7d0 195 private:
PavelSavyhin 0:4cdaf9b1e7d0 196
PavelSavyhin 0:4cdaf9b1e7d0 197 void cleanSession();
PavelSavyhin 0:4cdaf9b1e7d0 198 int cycle(Timer& timer);
PavelSavyhin 0:4cdaf9b1e7d0 199 int waitfor(int packet_type, Timer& timer);
PavelSavyhin 0:4cdaf9b1e7d0 200 int keepalive();
PavelSavyhin 0:4cdaf9b1e7d0 201 int publish(int len, Timer& timer, enum QoS qos);
PavelSavyhin 0:4cdaf9b1e7d0 202
PavelSavyhin 0:4cdaf9b1e7d0 203 int decodePacket(int* value, int timeout);
PavelSavyhin 0:4cdaf9b1e7d0 204 int readPacket(Timer& timer);
PavelSavyhin 0:4cdaf9b1e7d0 205 int sendPacket(int length, Timer& timer);
PavelSavyhin 0:4cdaf9b1e7d0 206 int deliverMessage(MQTTString& topicName, Message& message);
PavelSavyhin 0:4cdaf9b1e7d0 207 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
PavelSavyhin 0:4cdaf9b1e7d0 208
PavelSavyhin 0:4cdaf9b1e7d0 209 Network& ipstack;
PavelSavyhin 0:4cdaf9b1e7d0 210 unsigned long command_timeout_ms;
PavelSavyhin 0:4cdaf9b1e7d0 211
PavelSavyhin 0:4cdaf9b1e7d0 212 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
PavelSavyhin 0:4cdaf9b1e7d0 213 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
PavelSavyhin 0:4cdaf9b1e7d0 214
PavelSavyhin 0:4cdaf9b1e7d0 215 Timer last_sent, last_received;
PavelSavyhin 0:4cdaf9b1e7d0 216 unsigned int keepAliveInterval;
PavelSavyhin 0:4cdaf9b1e7d0 217 bool ping_outstanding;
PavelSavyhin 0:4cdaf9b1e7d0 218 bool cleansession;
PavelSavyhin 0:4cdaf9b1e7d0 219
PavelSavyhin 0:4cdaf9b1e7d0 220 PacketId packetid;
PavelSavyhin 0:4cdaf9b1e7d0 221
PavelSavyhin 0:4cdaf9b1e7d0 222 struct MessageHandlers
PavelSavyhin 0:4cdaf9b1e7d0 223 {
PavelSavyhin 0:4cdaf9b1e7d0 224 const char* topicFilter;
PavelSavyhin 0:4cdaf9b1e7d0 225 FP<void, MessageData&> fp;
PavelSavyhin 0:4cdaf9b1e7d0 226 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
PavelSavyhin 0:4cdaf9b1e7d0 227
PavelSavyhin 0:4cdaf9b1e7d0 228 FP<void, MessageData&> defaultMessageHandler;
PavelSavyhin 0:4cdaf9b1e7d0 229
PavelSavyhin 0:4cdaf9b1e7d0 230 bool isconnected;
PavelSavyhin 0:4cdaf9b1e7d0 231
PavelSavyhin 0:4cdaf9b1e7d0 232 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 233 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect
PavelSavyhin 0:4cdaf9b1e7d0 234 int inflightLen;
PavelSavyhin 0:4cdaf9b1e7d0 235 unsigned short inflightMsgid;
PavelSavyhin 0:4cdaf9b1e7d0 236 enum QoS inflightQoS;
PavelSavyhin 0:4cdaf9b1e7d0 237 #endif
PavelSavyhin 0:4cdaf9b1e7d0 238
PavelSavyhin 0:4cdaf9b1e7d0 239 #if MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 240 bool pubrel;
PavelSavyhin 0:4cdaf9b1e7d0 241 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
PavelSavyhin 0:4cdaf9b1e7d0 242 #define MAX_INCOMING_QOS2_MESSAGES 10
PavelSavyhin 0:4cdaf9b1e7d0 243 #endif
PavelSavyhin 0:4cdaf9b1e7d0 244 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
PavelSavyhin 0:4cdaf9b1e7d0 245 bool isQoS2msgidFree(unsigned short id);
PavelSavyhin 0:4cdaf9b1e7d0 246 bool useQoS2msgid(unsigned short id);
PavelSavyhin 0:4cdaf9b1e7d0 247 void freeQoS2msgid(unsigned short id);
PavelSavyhin 0:4cdaf9b1e7d0 248 #endif
PavelSavyhin 0:4cdaf9b1e7d0 249
PavelSavyhin 0:4cdaf9b1e7d0 250 };
PavelSavyhin 0:4cdaf9b1e7d0 251
PavelSavyhin 0:4cdaf9b1e7d0 252 }
PavelSavyhin 0:4cdaf9b1e7d0 253
PavelSavyhin 0:4cdaf9b1e7d0 254
PavelSavyhin 0:4cdaf9b1e7d0 255 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
PavelSavyhin 0:4cdaf9b1e7d0 256 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
PavelSavyhin 0:4cdaf9b1e7d0 257 {
PavelSavyhin 0:4cdaf9b1e7d0 258 ping_outstanding = false;
PavelSavyhin 0:4cdaf9b1e7d0 259 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 260 messageHandlers[i].topicFilter = 0;
PavelSavyhin 0:4cdaf9b1e7d0 261 isconnected = false;
PavelSavyhin 0:4cdaf9b1e7d0 262
PavelSavyhin 0:4cdaf9b1e7d0 263 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 264 inflightMsgid = 0;
PavelSavyhin 0:4cdaf9b1e7d0 265 inflightQoS = QOS0;
PavelSavyhin 0:4cdaf9b1e7d0 266 #endif
PavelSavyhin 0:4cdaf9b1e7d0 267
PavelSavyhin 0:4cdaf9b1e7d0 268 #if MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 269 pubrel = false;
PavelSavyhin 0:4cdaf9b1e7d0 270 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 271 incomingQoS2messages[i] = 0;
PavelSavyhin 0:4cdaf9b1e7d0 272 #endif
PavelSavyhin 0:4cdaf9b1e7d0 273 }
PavelSavyhin 0:4cdaf9b1e7d0 274
PavelSavyhin 0:4cdaf9b1e7d0 275
PavelSavyhin 0:4cdaf9b1e7d0 276 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
PavelSavyhin 0:4cdaf9b1e7d0 277 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
PavelSavyhin 0:4cdaf9b1e7d0 278 {
PavelSavyhin 0:4cdaf9b1e7d0 279 last_sent = Timer();
PavelSavyhin 0:4cdaf9b1e7d0 280 last_received = Timer();
PavelSavyhin 0:4cdaf9b1e7d0 281 this->command_timeout_ms = command_timeout_ms;
PavelSavyhin 0:4cdaf9b1e7d0 282 cleanSession();
PavelSavyhin 0:4cdaf9b1e7d0 283 }
PavelSavyhin 0:4cdaf9b1e7d0 284
PavelSavyhin 0:4cdaf9b1e7d0 285
PavelSavyhin 0:4cdaf9b1e7d0 286 #if MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 287 template<class Network, class Timer, int a, int b>
PavelSavyhin 0:4cdaf9b1e7d0 288 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
PavelSavyhin 0:4cdaf9b1e7d0 289 {
PavelSavyhin 0:4cdaf9b1e7d0 290 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 291 {
PavelSavyhin 0:4cdaf9b1e7d0 292 if (incomingQoS2messages[i] == id)
PavelSavyhin 0:4cdaf9b1e7d0 293 return false;
PavelSavyhin 0:4cdaf9b1e7d0 294 }
PavelSavyhin 0:4cdaf9b1e7d0 295 return true;
PavelSavyhin 0:4cdaf9b1e7d0 296 }
PavelSavyhin 0:4cdaf9b1e7d0 297
PavelSavyhin 0:4cdaf9b1e7d0 298
PavelSavyhin 0:4cdaf9b1e7d0 299 template<class Network, class Timer, int a, int b>
PavelSavyhin 0:4cdaf9b1e7d0 300 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
PavelSavyhin 0:4cdaf9b1e7d0 301 {
PavelSavyhin 0:4cdaf9b1e7d0 302 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 303 {
PavelSavyhin 0:4cdaf9b1e7d0 304 if (incomingQoS2messages[i] == 0)
PavelSavyhin 0:4cdaf9b1e7d0 305 {
PavelSavyhin 0:4cdaf9b1e7d0 306 incomingQoS2messages[i] = id;
PavelSavyhin 0:4cdaf9b1e7d0 307 return true;
PavelSavyhin 0:4cdaf9b1e7d0 308 }
PavelSavyhin 0:4cdaf9b1e7d0 309 }
PavelSavyhin 0:4cdaf9b1e7d0 310 return false;
PavelSavyhin 0:4cdaf9b1e7d0 311 }
PavelSavyhin 0:4cdaf9b1e7d0 312
PavelSavyhin 0:4cdaf9b1e7d0 313
PavelSavyhin 0:4cdaf9b1e7d0 314 template<class Network, class Timer, int a, int b>
PavelSavyhin 0:4cdaf9b1e7d0 315 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
PavelSavyhin 0:4cdaf9b1e7d0 316 {
PavelSavyhin 0:4cdaf9b1e7d0 317 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 318 {
PavelSavyhin 0:4cdaf9b1e7d0 319 if (incomingQoS2messages[i] == id)
PavelSavyhin 0:4cdaf9b1e7d0 320 {
PavelSavyhin 0:4cdaf9b1e7d0 321 incomingQoS2messages[i] = 0;
PavelSavyhin 0:4cdaf9b1e7d0 322 return;
PavelSavyhin 0:4cdaf9b1e7d0 323 }
PavelSavyhin 0:4cdaf9b1e7d0 324 }
PavelSavyhin 0:4cdaf9b1e7d0 325 }
PavelSavyhin 0:4cdaf9b1e7d0 326 #endif
PavelSavyhin 0:4cdaf9b1e7d0 327
PavelSavyhin 0:4cdaf9b1e7d0 328
PavelSavyhin 0:4cdaf9b1e7d0 329 template<class Network, class Timer, int a, int b>
PavelSavyhin 0:4cdaf9b1e7d0 330 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
PavelSavyhin 0:4cdaf9b1e7d0 331 {
PavelSavyhin 0:4cdaf9b1e7d0 332 int rc = FAILURE,
PavelSavyhin 0:4cdaf9b1e7d0 333 sent = 0;
PavelSavyhin 0:4cdaf9b1e7d0 334
PavelSavyhin 0:4cdaf9b1e7d0 335 while (sent < length && !timer.expired())
PavelSavyhin 0:4cdaf9b1e7d0 336 {
PavelSavyhin 0:4cdaf9b1e7d0 337 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
PavelSavyhin 0:4cdaf9b1e7d0 338 if (rc < 0) // there was an error writing the data
PavelSavyhin 0:4cdaf9b1e7d0 339 break;
PavelSavyhin 0:4cdaf9b1e7d0 340 sent += rc;
PavelSavyhin 0:4cdaf9b1e7d0 341 }
PavelSavyhin 0:4cdaf9b1e7d0 342 if (sent == length)
PavelSavyhin 0:4cdaf9b1e7d0 343 {
PavelSavyhin 0:4cdaf9b1e7d0 344 if (this->keepAliveInterval > 0)
PavelSavyhin 0:4cdaf9b1e7d0 345 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
PavelSavyhin 0:4cdaf9b1e7d0 346 rc = SUCCESS;
PavelSavyhin 0:4cdaf9b1e7d0 347 }
PavelSavyhin 0:4cdaf9b1e7d0 348 else
PavelSavyhin 0:4cdaf9b1e7d0 349 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 350
PavelSavyhin 0:4cdaf9b1e7d0 351 #if defined(MQTT_DEBUG)
PavelSavyhin 0:4cdaf9b1e7d0 352 char printbuf[150];
PavelSavyhin 0:4cdaf9b1e7d0 353 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
PavelSavyhin 0:4cdaf9b1e7d0 354 #endif
PavelSavyhin 0:4cdaf9b1e7d0 355 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 356 }
PavelSavyhin 0:4cdaf9b1e7d0 357
PavelSavyhin 0:4cdaf9b1e7d0 358
PavelSavyhin 0:4cdaf9b1e7d0 359 template<class Network, class Timer, int a, int b>
PavelSavyhin 0:4cdaf9b1e7d0 360 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
PavelSavyhin 0:4cdaf9b1e7d0 361 {
PavelSavyhin 0:4cdaf9b1e7d0 362 unsigned char c;
PavelSavyhin 0:4cdaf9b1e7d0 363 int multiplier = 1;
PavelSavyhin 0:4cdaf9b1e7d0 364 int len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 365 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
PavelSavyhin 0:4cdaf9b1e7d0 366
PavelSavyhin 0:4cdaf9b1e7d0 367 *value = 0;
PavelSavyhin 0:4cdaf9b1e7d0 368 do
PavelSavyhin 0:4cdaf9b1e7d0 369 {
PavelSavyhin 0:4cdaf9b1e7d0 370 int rc = MQTTPACKET_READ_ERROR;
PavelSavyhin 0:4cdaf9b1e7d0 371
PavelSavyhin 0:4cdaf9b1e7d0 372 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
PavelSavyhin 0:4cdaf9b1e7d0 373 {
PavelSavyhin 0:4cdaf9b1e7d0 374 rc = MQTTPACKET_READ_ERROR; /* bad data */
PavelSavyhin 0:4cdaf9b1e7d0 375 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 376 }
PavelSavyhin 0:4cdaf9b1e7d0 377 rc = ipstack.read(&c, 1, timeout);
PavelSavyhin 0:4cdaf9b1e7d0 378 if (rc != 1)
PavelSavyhin 0:4cdaf9b1e7d0 379 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 380 *value += (c & 127) * multiplier;
PavelSavyhin 0:4cdaf9b1e7d0 381 multiplier *= 128;
PavelSavyhin 0:4cdaf9b1e7d0 382 } while ((c & 128) != 0);
PavelSavyhin 0:4cdaf9b1e7d0 383 exit:
PavelSavyhin 0:4cdaf9b1e7d0 384 return len;
PavelSavyhin 0:4cdaf9b1e7d0 385 }
PavelSavyhin 0:4cdaf9b1e7d0 386
PavelSavyhin 0:4cdaf9b1e7d0 387
PavelSavyhin 0:4cdaf9b1e7d0 388 /**
PavelSavyhin 0:4cdaf9b1e7d0 389 * If any read fails in this method, then we should disconnect from the network, as on reconnect
PavelSavyhin 0:4cdaf9b1e7d0 390 * the packets can be retried.
PavelSavyhin 0:4cdaf9b1e7d0 391 * @param timeout the max time to wait for the packet read to complete, in milliseconds
PavelSavyhin 0:4cdaf9b1e7d0 392 * @return the MQTT packet type, or -1 if none
PavelSavyhin 0:4cdaf9b1e7d0 393 */
PavelSavyhin 0:4cdaf9b1e7d0 394 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 395 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
PavelSavyhin 0:4cdaf9b1e7d0 396 {
PavelSavyhin 0:4cdaf9b1e7d0 397 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 398 MQTTHeader header = {0};
PavelSavyhin 0:4cdaf9b1e7d0 399 int len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 400 int rem_len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 401
PavelSavyhin 0:4cdaf9b1e7d0 402 /* 1. read the header byte. This has the packet type in it */
PavelSavyhin 0:4cdaf9b1e7d0 403 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
PavelSavyhin 0:4cdaf9b1e7d0 404 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 405
PavelSavyhin 0:4cdaf9b1e7d0 406 len = 1;
PavelSavyhin 0:4cdaf9b1e7d0 407 /* 2. read the remaining length. This is variable in itself */
PavelSavyhin 0:4cdaf9b1e7d0 408 decodePacket(&rem_len, timer.left_ms());
PavelSavyhin 0:4cdaf9b1e7d0 409 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
PavelSavyhin 0:4cdaf9b1e7d0 410
PavelSavyhin 0:4cdaf9b1e7d0 411 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
PavelSavyhin 0:4cdaf9b1e7d0 412 {
PavelSavyhin 0:4cdaf9b1e7d0 413 rc = BUFFER_OVERFLOW;
PavelSavyhin 0:4cdaf9b1e7d0 414 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 415 }
PavelSavyhin 0:4cdaf9b1e7d0 416
PavelSavyhin 0:4cdaf9b1e7d0 417 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
PavelSavyhin 0:4cdaf9b1e7d0 418 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
PavelSavyhin 0:4cdaf9b1e7d0 419 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 420
PavelSavyhin 0:4cdaf9b1e7d0 421 header.byte = readbuf[0];
PavelSavyhin 0:4cdaf9b1e7d0 422 rc = header.bits.type;
PavelSavyhin 0:4cdaf9b1e7d0 423 if (this->keepAliveInterval > 0)
PavelSavyhin 0:4cdaf9b1e7d0 424 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
PavelSavyhin 0:4cdaf9b1e7d0 425 exit:
PavelSavyhin 0:4cdaf9b1e7d0 426
PavelSavyhin 0:4cdaf9b1e7d0 427 #if defined(MQTT_DEBUG)
PavelSavyhin 0:4cdaf9b1e7d0 428 if (rc >= 0)
PavelSavyhin 0:4cdaf9b1e7d0 429 {
PavelSavyhin 0:4cdaf9b1e7d0 430 char printbuf[50];
PavelSavyhin 0:4cdaf9b1e7d0 431 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
PavelSavyhin 0:4cdaf9b1e7d0 432 }
PavelSavyhin 0:4cdaf9b1e7d0 433 #endif
PavelSavyhin 0:4cdaf9b1e7d0 434 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 435 }
PavelSavyhin 0:4cdaf9b1e7d0 436
PavelSavyhin 0:4cdaf9b1e7d0 437
PavelSavyhin 0:4cdaf9b1e7d0 438 // assume topic filter and name is in correct format
PavelSavyhin 0:4cdaf9b1e7d0 439 // # can only be at end
PavelSavyhin 0:4cdaf9b1e7d0 440 // + and # can only be next to separator
PavelSavyhin 0:4cdaf9b1e7d0 441 template<class Network, class Timer, int a, int b>
PavelSavyhin 0:4cdaf9b1e7d0 442 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
PavelSavyhin 0:4cdaf9b1e7d0 443 {
PavelSavyhin 0:4cdaf9b1e7d0 444 char* curf = topicFilter;
PavelSavyhin 0:4cdaf9b1e7d0 445 char* curn = topicName.lenstring.data;
PavelSavyhin 0:4cdaf9b1e7d0 446 char* curn_end = curn + topicName.lenstring.len;
PavelSavyhin 0:4cdaf9b1e7d0 447
PavelSavyhin 0:4cdaf9b1e7d0 448 while (*curf && curn < curn_end)
PavelSavyhin 0:4cdaf9b1e7d0 449 {
PavelSavyhin 0:4cdaf9b1e7d0 450 if (*curn == '/' && *curf != '/')
PavelSavyhin 0:4cdaf9b1e7d0 451 break;
PavelSavyhin 0:4cdaf9b1e7d0 452 if (*curf != '+' && *curf != '#' && *curf != *curn)
PavelSavyhin 0:4cdaf9b1e7d0 453 break;
PavelSavyhin 0:4cdaf9b1e7d0 454 if (*curf == '+')
PavelSavyhin 0:4cdaf9b1e7d0 455 { // skip until we meet the next separator, or end of string
PavelSavyhin 0:4cdaf9b1e7d0 456 char* nextpos = curn + 1;
PavelSavyhin 0:4cdaf9b1e7d0 457 while (nextpos < curn_end && *nextpos != '/')
PavelSavyhin 0:4cdaf9b1e7d0 458 nextpos = ++curn + 1;
PavelSavyhin 0:4cdaf9b1e7d0 459 }
PavelSavyhin 0:4cdaf9b1e7d0 460 else if (*curf == '#')
PavelSavyhin 0:4cdaf9b1e7d0 461 curn = curn_end - 1; // skip until end of string
PavelSavyhin 0:4cdaf9b1e7d0 462 curf++;
PavelSavyhin 0:4cdaf9b1e7d0 463 curn++;
PavelSavyhin 0:4cdaf9b1e7d0 464 };
PavelSavyhin 0:4cdaf9b1e7d0 465
PavelSavyhin 0:4cdaf9b1e7d0 466 return (curn == curn_end) && (*curf == '\0');
PavelSavyhin 0:4cdaf9b1e7d0 467 }
PavelSavyhin 0:4cdaf9b1e7d0 468
PavelSavyhin 0:4cdaf9b1e7d0 469
PavelSavyhin 0:4cdaf9b1e7d0 470
PavelSavyhin 0:4cdaf9b1e7d0 471 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
PavelSavyhin 0:4cdaf9b1e7d0 472 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
PavelSavyhin 0:4cdaf9b1e7d0 473 {
PavelSavyhin 0:4cdaf9b1e7d0 474 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 475
PavelSavyhin 0:4cdaf9b1e7d0 476 // we have to find the right message handler - indexed by topic
PavelSavyhin 0:4cdaf9b1e7d0 477 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 478 {
PavelSavyhin 0:4cdaf9b1e7d0 479 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
PavelSavyhin 0:4cdaf9b1e7d0 480 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
PavelSavyhin 0:4cdaf9b1e7d0 481 {
PavelSavyhin 0:4cdaf9b1e7d0 482 if (messageHandlers[i].fp.attached())
PavelSavyhin 0:4cdaf9b1e7d0 483 {
PavelSavyhin 0:4cdaf9b1e7d0 484 MessageData md(topicName, message);
PavelSavyhin 0:4cdaf9b1e7d0 485 messageHandlers[i].fp(md);
PavelSavyhin 0:4cdaf9b1e7d0 486 rc = SUCCESS;
PavelSavyhin 0:4cdaf9b1e7d0 487 }
PavelSavyhin 0:4cdaf9b1e7d0 488 }
PavelSavyhin 0:4cdaf9b1e7d0 489 }
PavelSavyhin 0:4cdaf9b1e7d0 490
PavelSavyhin 0:4cdaf9b1e7d0 491 if (rc == FAILURE && defaultMessageHandler.attached())
PavelSavyhin 0:4cdaf9b1e7d0 492 {
PavelSavyhin 0:4cdaf9b1e7d0 493 MessageData md(topicName, message);
PavelSavyhin 0:4cdaf9b1e7d0 494 defaultMessageHandler(md);
PavelSavyhin 0:4cdaf9b1e7d0 495 rc = SUCCESS;
PavelSavyhin 0:4cdaf9b1e7d0 496 }
PavelSavyhin 0:4cdaf9b1e7d0 497
PavelSavyhin 0:4cdaf9b1e7d0 498 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 499 }
PavelSavyhin 0:4cdaf9b1e7d0 500
PavelSavyhin 0:4cdaf9b1e7d0 501
PavelSavyhin 0:4cdaf9b1e7d0 502
PavelSavyhin 0:4cdaf9b1e7d0 503 template<class Network, class Timer, int a, int b>
PavelSavyhin 0:4cdaf9b1e7d0 504 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
PavelSavyhin 0:4cdaf9b1e7d0 505 {
PavelSavyhin 0:4cdaf9b1e7d0 506 int rc = SUCCESS;
PavelSavyhin 0:4cdaf9b1e7d0 507 Timer timer = Timer();
PavelSavyhin 0:4cdaf9b1e7d0 508
PavelSavyhin 0:4cdaf9b1e7d0 509 timer.countdown_ms(timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 510 while (!timer.expired())
PavelSavyhin 0:4cdaf9b1e7d0 511 {
PavelSavyhin 0:4cdaf9b1e7d0 512 if (cycle(timer) < 0)
PavelSavyhin 0:4cdaf9b1e7d0 513 {
PavelSavyhin 0:4cdaf9b1e7d0 514 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 515 break;
PavelSavyhin 0:4cdaf9b1e7d0 516 }
PavelSavyhin 0:4cdaf9b1e7d0 517 }
PavelSavyhin 0:4cdaf9b1e7d0 518
PavelSavyhin 0:4cdaf9b1e7d0 519 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 520 }
PavelSavyhin 0:4cdaf9b1e7d0 521
PavelSavyhin 0:4cdaf9b1e7d0 522
PavelSavyhin 0:4cdaf9b1e7d0 523 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 524 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
PavelSavyhin 0:4cdaf9b1e7d0 525 {
PavelSavyhin 0:4cdaf9b1e7d0 526 /* get one piece of work off the wire and one pass through */
PavelSavyhin 0:4cdaf9b1e7d0 527
PavelSavyhin 0:4cdaf9b1e7d0 528 // read the socket, see what work is due
PavelSavyhin 0:4cdaf9b1e7d0 529 int packet_type = readPacket(timer);
PavelSavyhin 0:4cdaf9b1e7d0 530
PavelSavyhin 0:4cdaf9b1e7d0 531 int len = 0,
PavelSavyhin 0:4cdaf9b1e7d0 532 rc = SUCCESS;
PavelSavyhin 0:4cdaf9b1e7d0 533
PavelSavyhin 0:4cdaf9b1e7d0 534 switch (packet_type)
PavelSavyhin 0:4cdaf9b1e7d0 535 {
PavelSavyhin 0:4cdaf9b1e7d0 536 case FAILURE:
PavelSavyhin 0:4cdaf9b1e7d0 537 case BUFFER_OVERFLOW:
PavelSavyhin 0:4cdaf9b1e7d0 538 rc = packet_type;
PavelSavyhin 0:4cdaf9b1e7d0 539 break;
PavelSavyhin 0:4cdaf9b1e7d0 540 case CONNACK:
PavelSavyhin 0:4cdaf9b1e7d0 541 case PUBACK:
PavelSavyhin 0:4cdaf9b1e7d0 542 case SUBACK:
PavelSavyhin 0:4cdaf9b1e7d0 543 break;
PavelSavyhin 0:4cdaf9b1e7d0 544 case PUBLISH:
PavelSavyhin 0:4cdaf9b1e7d0 545 {
PavelSavyhin 0:4cdaf9b1e7d0 546 MQTTString topicName = MQTTString_initializer;
PavelSavyhin 0:4cdaf9b1e7d0 547 Message msg;
PavelSavyhin 0:4cdaf9b1e7d0 548 int intQoS;
PavelSavyhin 0:4cdaf9b1e7d0 549 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
PavelSavyhin 0:4cdaf9b1e7d0 550 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
PavelSavyhin 0:4cdaf9b1e7d0 551 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 552 msg.qos = (enum QoS)intQoS;
PavelSavyhin 0:4cdaf9b1e7d0 553 #if MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 554 if (msg.qos != QOS2)
PavelSavyhin 0:4cdaf9b1e7d0 555 #endif
PavelSavyhin 0:4cdaf9b1e7d0 556 deliverMessage(topicName, msg);
PavelSavyhin 0:4cdaf9b1e7d0 557 #if MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 558 else if (isQoS2msgidFree(msg.id))
PavelSavyhin 0:4cdaf9b1e7d0 559 {
PavelSavyhin 0:4cdaf9b1e7d0 560 if (useQoS2msgid(msg.id))
PavelSavyhin 0:4cdaf9b1e7d0 561 deliverMessage(topicName, msg);
PavelSavyhin 0:4cdaf9b1e7d0 562 else
PavelSavyhin 0:4cdaf9b1e7d0 563 WARN("Maximum number of incoming QoS2 messages exceeded");
PavelSavyhin 0:4cdaf9b1e7d0 564 }
PavelSavyhin 0:4cdaf9b1e7d0 565 #endif
PavelSavyhin 0:4cdaf9b1e7d0 566 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 567 if (msg.qos != QOS0)
PavelSavyhin 0:4cdaf9b1e7d0 568 {
PavelSavyhin 0:4cdaf9b1e7d0 569 if (msg.qos == QOS1)
PavelSavyhin 0:4cdaf9b1e7d0 570 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
PavelSavyhin 0:4cdaf9b1e7d0 571 else if (msg.qos == QOS2)
PavelSavyhin 0:4cdaf9b1e7d0 572 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
PavelSavyhin 0:4cdaf9b1e7d0 573 if (len <= 0)
PavelSavyhin 0:4cdaf9b1e7d0 574 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 575 else
PavelSavyhin 0:4cdaf9b1e7d0 576 rc = sendPacket(len, timer);
PavelSavyhin 0:4cdaf9b1e7d0 577 if (rc == FAILURE)
PavelSavyhin 0:4cdaf9b1e7d0 578 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 579 }
PavelSavyhin 0:4cdaf9b1e7d0 580 break;
PavelSavyhin 0:4cdaf9b1e7d0 581 #endif
PavelSavyhin 0:4cdaf9b1e7d0 582 }
PavelSavyhin 0:4cdaf9b1e7d0 583 #if MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 584 case PUBREC:
PavelSavyhin 0:4cdaf9b1e7d0 585 case PUBREL:
PavelSavyhin 0:4cdaf9b1e7d0 586 unsigned short mypacketid;
PavelSavyhin 0:4cdaf9b1e7d0 587 unsigned char dup, type;
PavelSavyhin 0:4cdaf9b1e7d0 588 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
PavelSavyhin 0:4cdaf9b1e7d0 589 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 590 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
PavelSavyhin 0:4cdaf9b1e7d0 591 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
PavelSavyhin 0:4cdaf9b1e7d0 592 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 593 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
PavelSavyhin 0:4cdaf9b1e7d0 594 rc = FAILURE; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 595 if (rc == FAILURE)
PavelSavyhin 0:4cdaf9b1e7d0 596 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 597 if (packet_type == PUBREL)
PavelSavyhin 0:4cdaf9b1e7d0 598 freeQoS2msgid(mypacketid);
PavelSavyhin 0:4cdaf9b1e7d0 599 break;
PavelSavyhin 0:4cdaf9b1e7d0 600
PavelSavyhin 0:4cdaf9b1e7d0 601 case PUBCOMP:
PavelSavyhin 0:4cdaf9b1e7d0 602 break;
PavelSavyhin 0:4cdaf9b1e7d0 603 #endif
PavelSavyhin 0:4cdaf9b1e7d0 604 case PINGRESP:
PavelSavyhin 0:4cdaf9b1e7d0 605 ping_outstanding = false;
PavelSavyhin 0:4cdaf9b1e7d0 606 break;
PavelSavyhin 0:4cdaf9b1e7d0 607 }
PavelSavyhin 0:4cdaf9b1e7d0 608 keepalive();
PavelSavyhin 0:4cdaf9b1e7d0 609 exit:
PavelSavyhin 0:4cdaf9b1e7d0 610 if (rc == SUCCESS)
PavelSavyhin 0:4cdaf9b1e7d0 611 rc = packet_type;
PavelSavyhin 0:4cdaf9b1e7d0 612 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 613 }
PavelSavyhin 0:4cdaf9b1e7d0 614
PavelSavyhin 0:4cdaf9b1e7d0 615
PavelSavyhin 0:4cdaf9b1e7d0 616 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 617 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
PavelSavyhin 0:4cdaf9b1e7d0 618 {
PavelSavyhin 0:4cdaf9b1e7d0 619 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 620
PavelSavyhin 0:4cdaf9b1e7d0 621 if (keepAliveInterval == 0)
PavelSavyhin 0:4cdaf9b1e7d0 622 {
PavelSavyhin 0:4cdaf9b1e7d0 623 rc = SUCCESS;
PavelSavyhin 0:4cdaf9b1e7d0 624 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 625 }
PavelSavyhin 0:4cdaf9b1e7d0 626
PavelSavyhin 0:4cdaf9b1e7d0 627 if (last_sent.expired() || last_received.expired())
PavelSavyhin 0:4cdaf9b1e7d0 628 {
PavelSavyhin 0:4cdaf9b1e7d0 629 if (!ping_outstanding)
PavelSavyhin 0:4cdaf9b1e7d0 630 {
PavelSavyhin 0:4cdaf9b1e7d0 631 Timer timer(1000);
PavelSavyhin 0:4cdaf9b1e7d0 632 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
PavelSavyhin 0:4cdaf9b1e7d0 633 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
PavelSavyhin 0:4cdaf9b1e7d0 634 ping_outstanding = true;
PavelSavyhin 0:4cdaf9b1e7d0 635 }
PavelSavyhin 0:4cdaf9b1e7d0 636 }
PavelSavyhin 0:4cdaf9b1e7d0 637
PavelSavyhin 0:4cdaf9b1e7d0 638 exit:
PavelSavyhin 0:4cdaf9b1e7d0 639 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 640 }
PavelSavyhin 0:4cdaf9b1e7d0 641
PavelSavyhin 0:4cdaf9b1e7d0 642
PavelSavyhin 0:4cdaf9b1e7d0 643 // only used in single-threaded mode where one command at a time is in process
PavelSavyhin 0:4cdaf9b1e7d0 644 template<class Network, class Timer, int a, int b>
PavelSavyhin 0:4cdaf9b1e7d0 645 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
PavelSavyhin 0:4cdaf9b1e7d0 646 {
PavelSavyhin 0:4cdaf9b1e7d0 647 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 648
PavelSavyhin 0:4cdaf9b1e7d0 649 do
PavelSavyhin 0:4cdaf9b1e7d0 650 {
PavelSavyhin 0:4cdaf9b1e7d0 651 if (timer.expired())
PavelSavyhin 0:4cdaf9b1e7d0 652 break; // we timed out
PavelSavyhin 0:4cdaf9b1e7d0 653 }
PavelSavyhin 0:4cdaf9b1e7d0 654 while ((rc = cycle(timer)) != packet_type);
PavelSavyhin 0:4cdaf9b1e7d0 655
PavelSavyhin 0:4cdaf9b1e7d0 656 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 657 }
PavelSavyhin 0:4cdaf9b1e7d0 658
PavelSavyhin 0:4cdaf9b1e7d0 659
PavelSavyhin 0:4cdaf9b1e7d0 660 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 661 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
PavelSavyhin 0:4cdaf9b1e7d0 662 {
PavelSavyhin 0:4cdaf9b1e7d0 663 Timer connect_timer(command_timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 664 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 665 int len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 666
PavelSavyhin 0:4cdaf9b1e7d0 667 if (isconnected) // don't send connect packet again if we are already connected
PavelSavyhin 0:4cdaf9b1e7d0 668 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 669
PavelSavyhin 0:4cdaf9b1e7d0 670 this->keepAliveInterval = options.keepAliveInterval;
PavelSavyhin 0:4cdaf9b1e7d0 671 this->cleansession = options.cleansession;
PavelSavyhin 0:4cdaf9b1e7d0 672 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
PavelSavyhin 0:4cdaf9b1e7d0 673 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 674 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
PavelSavyhin 0:4cdaf9b1e7d0 675 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 676
PavelSavyhin 0:4cdaf9b1e7d0 677 if (this->keepAliveInterval > 0)
PavelSavyhin 0:4cdaf9b1e7d0 678 last_received.countdown(this->keepAliveInterval);
PavelSavyhin 0:4cdaf9b1e7d0 679 // this will be a blocking call, wait for the connack
PavelSavyhin 0:4cdaf9b1e7d0 680 if (waitfor(CONNACK, connect_timer) == CONNACK)
PavelSavyhin 0:4cdaf9b1e7d0 681 {
PavelSavyhin 0:4cdaf9b1e7d0 682 unsigned char connack_rc = 255;
PavelSavyhin 0:4cdaf9b1e7d0 683 bool sessionPresent = false;
PavelSavyhin 0:4cdaf9b1e7d0 684 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 685 rc = connack_rc;
PavelSavyhin 0:4cdaf9b1e7d0 686 else
PavelSavyhin 0:4cdaf9b1e7d0 687 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 688 }
PavelSavyhin 0:4cdaf9b1e7d0 689 else
PavelSavyhin 0:4cdaf9b1e7d0 690 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 691
PavelSavyhin 0:4cdaf9b1e7d0 692 #if MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 693 // resend any inflight publish
PavelSavyhin 0:4cdaf9b1e7d0 694 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
PavelSavyhin 0:4cdaf9b1e7d0 695 {
PavelSavyhin 0:4cdaf9b1e7d0 696 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
PavelSavyhin 0:4cdaf9b1e7d0 697 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 698 else
PavelSavyhin 0:4cdaf9b1e7d0 699 rc = publish(len, connect_timer, inflightQoS);
PavelSavyhin 0:4cdaf9b1e7d0 700 }
PavelSavyhin 0:4cdaf9b1e7d0 701 else
PavelSavyhin 0:4cdaf9b1e7d0 702 #endif
PavelSavyhin 0:4cdaf9b1e7d0 703 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 704 if (inflightMsgid > 0)
PavelSavyhin 0:4cdaf9b1e7d0 705 {
PavelSavyhin 0:4cdaf9b1e7d0 706 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
PavelSavyhin 0:4cdaf9b1e7d0 707 rc = publish(inflightLen, connect_timer, inflightQoS);
PavelSavyhin 0:4cdaf9b1e7d0 708 }
PavelSavyhin 0:4cdaf9b1e7d0 709 #endif
PavelSavyhin 0:4cdaf9b1e7d0 710
PavelSavyhin 0:4cdaf9b1e7d0 711 exit:
PavelSavyhin 0:4cdaf9b1e7d0 712 if (rc == SUCCESS)
PavelSavyhin 0:4cdaf9b1e7d0 713 isconnected = true;
PavelSavyhin 0:4cdaf9b1e7d0 714 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 715 }
PavelSavyhin 0:4cdaf9b1e7d0 716
PavelSavyhin 0:4cdaf9b1e7d0 717
PavelSavyhin 0:4cdaf9b1e7d0 718 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 719 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
PavelSavyhin 0:4cdaf9b1e7d0 720 {
PavelSavyhin 0:4cdaf9b1e7d0 721 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
PavelSavyhin 0:4cdaf9b1e7d0 722 return connect(default_options);
PavelSavyhin 0:4cdaf9b1e7d0 723 }
PavelSavyhin 0:4cdaf9b1e7d0 724
PavelSavyhin 0:4cdaf9b1e7d0 725
PavelSavyhin 0:4cdaf9b1e7d0 726 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
PavelSavyhin 0:4cdaf9b1e7d0 727 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
PavelSavyhin 0:4cdaf9b1e7d0 728 {
PavelSavyhin 0:4cdaf9b1e7d0 729 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 730 Timer timer(command_timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 731 int len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 732 MQTTString topic = {(char*)topicFilter, {0, 0}};
PavelSavyhin 0:4cdaf9b1e7d0 733
PavelSavyhin 0:4cdaf9b1e7d0 734 if (!isconnected)
PavelSavyhin 0:4cdaf9b1e7d0 735 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 736
PavelSavyhin 0:4cdaf9b1e7d0 737 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
PavelSavyhin 0:4cdaf9b1e7d0 738 if (len <= 0)
PavelSavyhin 0:4cdaf9b1e7d0 739 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 740 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
PavelSavyhin 0:4cdaf9b1e7d0 741 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 742
PavelSavyhin 0:4cdaf9b1e7d0 743 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
PavelSavyhin 0:4cdaf9b1e7d0 744 {
PavelSavyhin 0:4cdaf9b1e7d0 745 int count = 0, grantedQoS = -1;
PavelSavyhin 0:4cdaf9b1e7d0 746 unsigned short mypacketid;
PavelSavyhin 0:4cdaf9b1e7d0 747 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 748 rc = grantedQoS; // 0, 1, 2 or 0x80
PavelSavyhin 0:4cdaf9b1e7d0 749 if (rc != 0x80)
PavelSavyhin 0:4cdaf9b1e7d0 750 {
PavelSavyhin 0:4cdaf9b1e7d0 751 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 752 {
PavelSavyhin 0:4cdaf9b1e7d0 753 if (messageHandlers[i].topicFilter == 0)
PavelSavyhin 0:4cdaf9b1e7d0 754 {
PavelSavyhin 0:4cdaf9b1e7d0 755 messageHandlers[i].topicFilter = topicFilter;
PavelSavyhin 0:4cdaf9b1e7d0 756 messageHandlers[i].fp.attach(messageHandler);
PavelSavyhin 0:4cdaf9b1e7d0 757 rc = 0;
PavelSavyhin 0:4cdaf9b1e7d0 758 break;
PavelSavyhin 0:4cdaf9b1e7d0 759 }
PavelSavyhin 0:4cdaf9b1e7d0 760 }
PavelSavyhin 0:4cdaf9b1e7d0 761 }
PavelSavyhin 0:4cdaf9b1e7d0 762 }
PavelSavyhin 0:4cdaf9b1e7d0 763 else
PavelSavyhin 0:4cdaf9b1e7d0 764 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 765
PavelSavyhin 0:4cdaf9b1e7d0 766 exit:
PavelSavyhin 0:4cdaf9b1e7d0 767 if (rc != SUCCESS)
PavelSavyhin 0:4cdaf9b1e7d0 768 cleanSession();
PavelSavyhin 0:4cdaf9b1e7d0 769 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 770 }
PavelSavyhin 0:4cdaf9b1e7d0 771
PavelSavyhin 0:4cdaf9b1e7d0 772
PavelSavyhin 0:4cdaf9b1e7d0 773 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
PavelSavyhin 0:4cdaf9b1e7d0 774 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
PavelSavyhin 0:4cdaf9b1e7d0 775 {
PavelSavyhin 0:4cdaf9b1e7d0 776 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 777 Timer timer(command_timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 778 MQTTString topic = {(char*)topicFilter, {0, 0}};
PavelSavyhin 0:4cdaf9b1e7d0 779 int len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 780
PavelSavyhin 0:4cdaf9b1e7d0 781 if (!isconnected)
PavelSavyhin 0:4cdaf9b1e7d0 782 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 783
PavelSavyhin 0:4cdaf9b1e7d0 784 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
PavelSavyhin 0:4cdaf9b1e7d0 785 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 786 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
PavelSavyhin 0:4cdaf9b1e7d0 787 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 788
PavelSavyhin 0:4cdaf9b1e7d0 789 if (waitfor(UNSUBACK, timer) == UNSUBACK)
PavelSavyhin 0:4cdaf9b1e7d0 790 {
PavelSavyhin 0:4cdaf9b1e7d0 791 unsigned short mypacketid; // should be the same as the packetid above
PavelSavyhin 0:4cdaf9b1e7d0 792 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
PavelSavyhin 0:4cdaf9b1e7d0 793 {
PavelSavyhin 0:4cdaf9b1e7d0 794 rc = 0;
PavelSavyhin 0:4cdaf9b1e7d0 795
PavelSavyhin 0:4cdaf9b1e7d0 796 // remove the subscription message handler associated with this topic, if there is one
PavelSavyhin 0:4cdaf9b1e7d0 797 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
PavelSavyhin 0:4cdaf9b1e7d0 798 {
PavelSavyhin 0:4cdaf9b1e7d0 799 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
PavelSavyhin 0:4cdaf9b1e7d0 800 {
PavelSavyhin 0:4cdaf9b1e7d0 801 messageHandlers[i].topicFilter = 0;
PavelSavyhin 0:4cdaf9b1e7d0 802 break;
PavelSavyhin 0:4cdaf9b1e7d0 803 }
PavelSavyhin 0:4cdaf9b1e7d0 804 }
PavelSavyhin 0:4cdaf9b1e7d0 805 }
PavelSavyhin 0:4cdaf9b1e7d0 806 }
PavelSavyhin 0:4cdaf9b1e7d0 807 else
PavelSavyhin 0:4cdaf9b1e7d0 808 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 809
PavelSavyhin 0:4cdaf9b1e7d0 810 exit:
PavelSavyhin 0:4cdaf9b1e7d0 811 if (rc != SUCCESS)
PavelSavyhin 0:4cdaf9b1e7d0 812 cleanSession();
PavelSavyhin 0:4cdaf9b1e7d0 813 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 814 }
PavelSavyhin 0:4cdaf9b1e7d0 815
PavelSavyhin 0:4cdaf9b1e7d0 816
PavelSavyhin 0:4cdaf9b1e7d0 817 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 818 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
PavelSavyhin 0:4cdaf9b1e7d0 819 {
PavelSavyhin 0:4cdaf9b1e7d0 820 int rc;
PavelSavyhin 0:4cdaf9b1e7d0 821
PavelSavyhin 0:4cdaf9b1e7d0 822 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
PavelSavyhin 0:4cdaf9b1e7d0 823 goto exit; // there was a problem
PavelSavyhin 0:4cdaf9b1e7d0 824
PavelSavyhin 0:4cdaf9b1e7d0 825 #if MQTTCLIENT_QOS1
PavelSavyhin 0:4cdaf9b1e7d0 826 if (qos == QOS1)
PavelSavyhin 0:4cdaf9b1e7d0 827 {
PavelSavyhin 0:4cdaf9b1e7d0 828 if (waitfor(PUBACK, timer) == PUBACK)
PavelSavyhin 0:4cdaf9b1e7d0 829 {
PavelSavyhin 0:4cdaf9b1e7d0 830 unsigned short mypacketid;
PavelSavyhin 0:4cdaf9b1e7d0 831 unsigned char dup, type;
PavelSavyhin 0:4cdaf9b1e7d0 832 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
PavelSavyhin 0:4cdaf9b1e7d0 833 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 834 else if (inflightMsgid == mypacketid)
PavelSavyhin 0:4cdaf9b1e7d0 835 inflightMsgid = 0;
PavelSavyhin 0:4cdaf9b1e7d0 836 }
PavelSavyhin 0:4cdaf9b1e7d0 837 else
PavelSavyhin 0:4cdaf9b1e7d0 838 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 839 }
PavelSavyhin 0:4cdaf9b1e7d0 840 #elif MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 841 else if (qos == QOS2)
PavelSavyhin 0:4cdaf9b1e7d0 842 {
PavelSavyhin 0:4cdaf9b1e7d0 843 if (waitfor(PUBCOMP, timer) == PUBCOMP)
PavelSavyhin 0:4cdaf9b1e7d0 844 {
PavelSavyhin 0:4cdaf9b1e7d0 845 unsigned short mypacketid;
PavelSavyhin 0:4cdaf9b1e7d0 846 unsigned char dup, type;
PavelSavyhin 0:4cdaf9b1e7d0 847 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
PavelSavyhin 0:4cdaf9b1e7d0 848 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 849 else if (inflightMsgid == mypacketid)
PavelSavyhin 0:4cdaf9b1e7d0 850 inflightMsgid = 0;
PavelSavyhin 0:4cdaf9b1e7d0 851 }
PavelSavyhin 0:4cdaf9b1e7d0 852 else
PavelSavyhin 0:4cdaf9b1e7d0 853 rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 854 }
PavelSavyhin 0:4cdaf9b1e7d0 855 #endif
PavelSavyhin 0:4cdaf9b1e7d0 856
PavelSavyhin 0:4cdaf9b1e7d0 857 exit:
PavelSavyhin 0:4cdaf9b1e7d0 858 if (rc != SUCCESS)
PavelSavyhin 0:4cdaf9b1e7d0 859 cleanSession();
PavelSavyhin 0:4cdaf9b1e7d0 860 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 861 }
PavelSavyhin 0:4cdaf9b1e7d0 862
PavelSavyhin 0:4cdaf9b1e7d0 863
PavelSavyhin 0:4cdaf9b1e7d0 864
PavelSavyhin 0:4cdaf9b1e7d0 865 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 866 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
PavelSavyhin 0:4cdaf9b1e7d0 867 {
PavelSavyhin 0:4cdaf9b1e7d0 868 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 869 Timer timer(command_timeout_ms);
PavelSavyhin 0:4cdaf9b1e7d0 870 MQTTString topicString = MQTTString_initializer;
PavelSavyhin 0:4cdaf9b1e7d0 871 int len = 0;
PavelSavyhin 0:4cdaf9b1e7d0 872
PavelSavyhin 0:4cdaf9b1e7d0 873 if (!isconnected)
PavelSavyhin 0:4cdaf9b1e7d0 874 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 875
PavelSavyhin 0:4cdaf9b1e7d0 876 topicString.cstring = (char*)topicName;
PavelSavyhin 0:4cdaf9b1e7d0 877
PavelSavyhin 0:4cdaf9b1e7d0 878 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 879 if (qos == QOS1 || qos == QOS2)
PavelSavyhin 0:4cdaf9b1e7d0 880 id = packetid.getNext();
PavelSavyhin 0:4cdaf9b1e7d0 881 #endif
PavelSavyhin 0:4cdaf9b1e7d0 882
PavelSavyhin 0:4cdaf9b1e7d0 883 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
PavelSavyhin 0:4cdaf9b1e7d0 884 topicString, (unsigned char*)payload, payloadlen);
PavelSavyhin 0:4cdaf9b1e7d0 885 if (len <= 0)
PavelSavyhin 0:4cdaf9b1e7d0 886 goto exit;
PavelSavyhin 0:4cdaf9b1e7d0 887
PavelSavyhin 0:4cdaf9b1e7d0 888 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 889 if (!cleansession)
PavelSavyhin 0:4cdaf9b1e7d0 890 {
PavelSavyhin 0:4cdaf9b1e7d0 891 memcpy(pubbuf, sendbuf, len);
PavelSavyhin 0:4cdaf9b1e7d0 892 inflightMsgid = id;
PavelSavyhin 0:4cdaf9b1e7d0 893 inflightLen = len;
PavelSavyhin 0:4cdaf9b1e7d0 894 inflightQoS = qos;
PavelSavyhin 0:4cdaf9b1e7d0 895 #if MQTTCLIENT_QOS2
PavelSavyhin 0:4cdaf9b1e7d0 896 pubrel = false;
PavelSavyhin 0:4cdaf9b1e7d0 897 #endif
PavelSavyhin 0:4cdaf9b1e7d0 898 }
PavelSavyhin 0:4cdaf9b1e7d0 899 #endif
PavelSavyhin 0:4cdaf9b1e7d0 900
PavelSavyhin 0:4cdaf9b1e7d0 901 rc = publish(len, timer, qos);
PavelSavyhin 0:4cdaf9b1e7d0 902 exit:
PavelSavyhin 0:4cdaf9b1e7d0 903 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 904 }
PavelSavyhin 0:4cdaf9b1e7d0 905
PavelSavyhin 0:4cdaf9b1e7d0 906
PavelSavyhin 0:4cdaf9b1e7d0 907 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 908 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
PavelSavyhin 0:4cdaf9b1e7d0 909 {
PavelSavyhin 0:4cdaf9b1e7d0 910 unsigned short id = 0; // dummy - not used for anything
PavelSavyhin 0:4cdaf9b1e7d0 911 return publish(topicName, payload, payloadlen, id, qos, retained);
PavelSavyhin 0:4cdaf9b1e7d0 912 }
PavelSavyhin 0:4cdaf9b1e7d0 913
PavelSavyhin 0:4cdaf9b1e7d0 914
PavelSavyhin 0:4cdaf9b1e7d0 915 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 916 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
PavelSavyhin 0:4cdaf9b1e7d0 917 {
PavelSavyhin 0:4cdaf9b1e7d0 918 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
PavelSavyhin 0:4cdaf9b1e7d0 919 }
PavelSavyhin 0:4cdaf9b1e7d0 920
PavelSavyhin 0:4cdaf9b1e7d0 921
PavelSavyhin 0:4cdaf9b1e7d0 922 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
PavelSavyhin 0:4cdaf9b1e7d0 923 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
PavelSavyhin 0:4cdaf9b1e7d0 924 {
PavelSavyhin 0:4cdaf9b1e7d0 925 int rc = FAILURE;
PavelSavyhin 0:4cdaf9b1e7d0 926 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
PavelSavyhin 0:4cdaf9b1e7d0 927 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
PavelSavyhin 0:4cdaf9b1e7d0 928 if (len > 0)
PavelSavyhin 0:4cdaf9b1e7d0 929 rc = sendPacket(len, timer); // send the disconnect packet
PavelSavyhin 0:4cdaf9b1e7d0 930
PavelSavyhin 0:4cdaf9b1e7d0 931 if (cleansession)
PavelSavyhin 0:4cdaf9b1e7d0 932 cleanSession();
PavelSavyhin 0:4cdaf9b1e7d0 933 else
PavelSavyhin 0:4cdaf9b1e7d0 934 isconnected = false;
PavelSavyhin 0:4cdaf9b1e7d0 935 return rc;
PavelSavyhin 0:4cdaf9b1e7d0 936 }
PavelSavyhin 0:4cdaf9b1e7d0 937
PavelSavyhin 0:4cdaf9b1e7d0 938
PavelSavyhin 0:4cdaf9b1e7d0 939 #endif