7 years, 2 months ago.

Is MQTTClient thread safe?

It seems that calling MQTTClient method "publish", for instance, causes a "Kernel error -6: Not allowed in ISR context" when called inside a Thread callback (or a EventQueue "call"/"call_every" callback function).

So, should it be called only by main thread?

2 Answers

7 years, 2 months ago.

I think this was due to an error introduced in latest MQTT commit. It's reverted now. The following now works for me:

#define logMessage printf

#include "easy-connect.h"
#include "MQTTNetwork.h"
#include "MQTTmbed.h"
#include "MQTTClient.h"

EventQueue event_queue;
Thread event_thread;

char* topic = "mbed-sample";
float version = 0.6;

DigitalOut led(LED1);

int arrivedcount = 0;

void messageArrived(MQTT::MessageData& md)
{
    MQTT::Message &message = md.message;
    logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
    logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
    ++arrivedcount;
}

unsigned int msg_id = 0;

void send_qos0(MQTT::Client<MQTTNetwork, Countdown>* client) {
    printf("send_qos0\r\n");

    MQTT::Message message;

    // QoS 0
    char buf[100];
    sprintf(buf, "Hello World!  QoS 0 message from app version %f\r\n", version);

    message.qos = MQTT::QOS0;
    message.retained = false;
    message.dup = false;
    message.payload = (void*)buf;
    message.payloadlen = strlen(buf)+1;
    message.id = ++msg_id;

    int old_ac = arrivedcount;

    int rc = client->publish(topic, message);
    // printf("publish for id %d returned %d\r\n", message.id, rc);

    while (old_ac == arrivedcount) {
        int y = client->yield(100);
        if (y == -1) {
            printf("yield failed...\r\n");
            break;
        }
        // printf("yield returned %d\r\n", y);
    }
    printf("retrieved message, exit'ing send_qos0\r\n");
}

void blink() {
    led = !led;
}

int main(int argc, char* argv[])
{
    InterruptIn sw2(SW2);

    event_thread.start(callback(&event_queue, &EventQueue::dispatch_forever));

    Ticker t;
    t.attach(&blink, 0.5f);

    logMessage("HelloMQTT: version is %.2f\r\n", version);

    NetworkInterface* network = easy_connect(true);
    if (!network) {
        return -1;
    }

    MQTTNetwork mqttNetwork(network);
    MQTT::Client<MQTTNetwork, Countdown> client(mqttNetwork);

    const char* hostname = "m2m.eclipse.org";
    int port = 1883;
    logMessage("Connecting to %s:%d\r\n", hostname, port);
    int rc = mqttNetwork.connect(hostname, port);
    logMessage("mqttNetwork.connect returned %d\r\n", rc);

    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = 3;
    data.clientID.cstring = "mbed-sample";
    data.username.cstring = "testuser";
    data.password.cstring = "testpassword";
    logMessage("MQTT connect...\r\n");
    rc = client.connect(data);
    logMessage("rc from MQTT connect is %d\r\n", rc);

    logMessage("MQTT subscribe...\r\n");
    rc = client.subscribe(topic, MQTT::QOS2, messageArrived);
    logMessage("rc from MQTT subscribe is %d\r\n", rc);

    sw2.fall(event_queue.event(&send_qos0, &client));
    logMessage("press SW2 to send message\r\n");

    wait_ms(osWaitForever);
}

For some reason yield is starting to return -1 after a few messages... Not sure if that's my network connection or something in the MQTT lib...

Regarding your error, you probably need to yield() more frequently. The server will disconnect your network connection after a brief period (2x keepalive timeout) if you don't yield() enough for MQTTClient to autosend the ping message.

posted by Andrew Domaszek 11 Sep 2017

Sadly, reverting to previous implementation of Countdown class clashes with latest mbedos release, since now I get following error when initializing Timer t with "t = Timer(); ":

[Error] Timer.h@50,0: #330-D: "mbed::NonCopyable<T> &mbed::NonCopyable<T>::operator=(const mbed::NonCopyable<T> &) [with T=mbed::Timer]" (declared at line 163 of "./mbed-os/platform/NonCopyable.h") is inaccessible

Any suggestions?

posted by Lorenzo Maiorfi 17 Sep 2017

We're working on a solution, for now remain on mbed OS 5.4 (or remove the NonCopyable trait from Timer class, but that's not a good solution :-)).

posted by Jan Jongboom 21 Sep 2017
7 years ago.

The MQTTClient class is not thread safe in that it is not intended that multiple threads use it simultaneously. The MQTT library has however now been updated which might help solve your problem.