7 years, 4 months ago.

MQTT Client and threading model: how to send and receive at the same time?

Hi.

I'm struggling with threading model I should use in order to send and receive messages through MQTT at the same time, i.e. managing as separate, unsolicited events.

In following sample (pointing to a local mqtt broker, whose address is defined at line 39), I am trying to publish a message whenever user pushes onboard button (with a simple antiglitch filter function) while logging received messages (i.e. messages published by other parties that match my subscription).

Running the sample, I only succeed in getting received messages as soon as they're published, while sent messages are enqueued (precisely, with a queue having size 1) until any other message arrives. In other words, messages that are going to be sent wait for next incoming message before being actually emitted.

Also, it should be pointed out that removing "_event_queue.call_every(100, yield_mqtt_client);" call (at line 178), behavior is exactly the opposite: messages are sent as soon as button is pressed, but received ones are "enqueued/delayed" until next outgoing message is actually emitted.

What pattern should I follow?

Thanks!

sample publishing and receiving messages at the same time

// change this to 1 to output messages to LCD instead of serial
#define USE_LCD 0

#if USE_LCD
#include "C12832.h"

// the actual pins are defined in mbed_app.json and can be overridden per target
C12832 lcd(LCD_MOSI, LCD_SCK, LCD_MISO, LCD_A0, LCD_NCS);

#define logMessage \
    lcd.cls();     \
    lcd.printf

#else

#define logMessage printf

#endif

#include "mbed.h"
#include "mbed_events.h"

#include <cstdlib>

#define MQTTCLIENT_QOS2 1

#include "easy-connect.h"

#include "MQTTNetwork.h"
#include "MQTTmbed.h"
#include "MQTTClient.h"

int arrivedcount = 0;
int sentcount = 0;

InterruptIn _blueButton(USER_BUTTON);
DigitalOut _led1(LED1);

const char *MQTT_BROKER_ADDRESS = "192.168.1.111";
const int MQTT_BROKER_PORT = 1883;

#define MQTT_BROKER_USERNAME NULL
#define MQTT_BROKER_PASSWORD NULL

#define MQTT_BROKER_CLIENT_ID "mbed"

#define RECEIVE_TOPIC "mbed-sample/device-rx"
#define SEND_TOPIC "mbed-sample/device-tx"

NetworkInterface *_network;
MQTTNetwork *_mqttNetwork;
MQTT::Client<MQTTNetwork, Countdown> *_client;

EventQueue _event_queue;
Thread _event_thread;

int _arrivedcount = 0;
unsigned int _msg_id = 0;

Timer _timer_latest_message;

Ticker _ticker_main;

void messageArrived(MQTT::MessageData &md)
{
    MQTT::Message &message = md.message;

    ++_arrivedcount;

    logMessage("Message #%d ('%.*s') arrived: qos %d, retained %d, dup %d, packetid %d\r\n", _arrivedcount, message.payloadlen, (char *)message.payload, message.qos, message.retained, message.dup, message.id);
}

void yield_mqtt_client()
{
    _client->yield(10);
}

void send_message(MQTT::QoS qos)
{
    int rc = 0;
    MQTT::Message message;

    char buf[100];
    sprintf(buf, "Message #%d", ++_msg_id);

    message.qos = qos;
    message.retained = false;
    message.dup = false;
    message.payload = (void *)buf;
    message.payloadlen = strlen(buf) + 1;
    message.id = _msg_id;

    if ((rc = _client->publish(SEND_TOPIC, message)) == 0)
    {
        printf("Message %d published successfully\r\n", _msg_id);
    }
    else
    {
        printf("publish for message %d failed with return code %d\r\n", _msg_id, rc);
    }
}

void send_message_antiglitch(MQTT::QoS qos)
{
    if (_timer_latest_message.read_ms() > 100)
    {
        send_message(qos);
        _timer_latest_message.reset();
    }
}

void blink()
{
    _led1 = !_led1;
}

bool initNetwork()
{
    int rc;

    _network = easy_connect(true);

    if (!_network)
    {
        logMessage("Network initialization failed\r\n");
        return false;
    }

    _mqttNetwork = new MQTTNetwork(_network);
    _client = new MQTT::Client<MQTTNetwork, Countdown>(*_mqttNetwork);

    logMessage("Connecting to %s:%d...\r\n", MQTT_BROKER_ADDRESS, MQTT_BROKER_PORT);

    rc = _mqttNetwork->connect(MQTT_BROKER_ADDRESS, MQTT_BROKER_PORT);

    if (rc != 0)
    {
        logMessage("rc from TCP connect is %d\r\n", rc);
        return false;
    }

    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;

    data.MQTTVersion = 3;
    data.clientID.cstring = MQTT_BROKER_CLIENT_ID;
    data.username.cstring = MQTT_BROKER_USERNAME;
    data.password.cstring = MQTT_BROKER_PASSWORD;

    if ((rc = _client->connect(data)) != 0)
    {
        logMessage("rc from MQTT connect is %d\r\n", rc);
        return false;
    }

    if ((rc = _client->subscribe(RECEIVE_TOPIC, MQTT::QOS1, messageArrived)) != 0)
    {
        logMessage("rc from MQTT subscribe is %d\r\n", rc);
        return false;
    }

    logMessage("...connection to %s:%d and subscription to %s successful\r\n", MQTT_BROKER_ADDRESS, MQTT_BROKER_PORT, RECEIVE_TOPIC);

    return true;
}

int main(int argc, char *argv[])
{
    _event_thread.start(callback(&_event_queue, &EventQueue::dispatch_forever));

    _ticker_main.attach(&blink, 0.5f);

    _timer_latest_message.start();

    logMessage("MQTT send/receive sample started...\r\n");

    if (!initNetwork()) return -1;

    _event_queue.call_every(100, yield_mqtt_client);

    _blueButton.rise(_event_queue.event(send_message_antiglitch, MQTT::QOS1));

    logMessage("Press button to send a message\r\n");

    wait_ms(osWaitForever);
}

1 Answer

7 years, 3 months ago.

The first thing to note is that this library is not thread safe (that was to be the job of the asynchronous MQTT client, which I have not yet written) so the publish() and yield() calls should not be active at the same time. This was intended to be a simple API for single threaded applications.

Because you are using QoS1, the publish call will wait for the acknowledgement from the broker before returning. Messages from the server will be delivered to the messageArrived callback during that time, as they will when yield() is called.

I think it's likely that what is actually happening is that messages are being received during the publish() call. To maximise two-way traffic, ideally we would always be in yield(), except when you want to publish.

(This will be a motivation to get on with the async API, which will consume more resources as it will need a background thread, and locking.)