Important changes to forums and questions
All forums and questions are now archived. To start a new conversation or read the latest updates go to forums.mbed.com.
7 years, 3 months ago.
Is MQTTClient thread safe?
It seems that calling MQTTClient method "publish", for instance, causes a "Kernel error -6: Not allowed in ISR context" when called inside a Thread callback (or a EventQueue "call"/"call_every" callback function).
So, should it be called only by main thread?
2 Answers
7 years, 3 months ago.
I think this was due to an error introduced in latest MQTT commit. It's reverted now. The following now works for me:
#define logMessage printf #include "easy-connect.h" #include "MQTTNetwork.h" #include "MQTTmbed.h" #include "MQTTClient.h" EventQueue event_queue; Thread event_thread; char* topic = "mbed-sample"; float version = 0.6; DigitalOut led(LED1); int arrivedcount = 0; void messageArrived(MQTT::MessageData& md) { MQTT::Message &message = md.message; logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); ++arrivedcount; } unsigned int msg_id = 0; void send_qos0(MQTT::Client<MQTTNetwork, Countdown>* client) { printf("send_qos0\r\n"); MQTT::Message message; // QoS 0 char buf[100]; sprintf(buf, "Hello World! QoS 0 message from app version %f\r\n", version); message.qos = MQTT::QOS0; message.retained = false; message.dup = false; message.payload = (void*)buf; message.payloadlen = strlen(buf)+1; message.id = ++msg_id; int old_ac = arrivedcount; int rc = client->publish(topic, message); // printf("publish for id %d returned %d\r\n", message.id, rc); while (old_ac == arrivedcount) { int y = client->yield(100); if (y == -1) { printf("yield failed...\r\n"); break; } // printf("yield returned %d\r\n", y); } printf("retrieved message, exit'ing send_qos0\r\n"); } void blink() { led = !led; } int main(int argc, char* argv[]) { InterruptIn sw2(SW2); event_thread.start(callback(&event_queue, &EventQueue::dispatch_forever)); Ticker t; t.attach(&blink, 0.5f); logMessage("HelloMQTT: version is %.2f\r\n", version); NetworkInterface* network = easy_connect(true); if (!network) { return -1; } MQTTNetwork mqttNetwork(network); MQTT::Client<MQTTNetwork, Countdown> client(mqttNetwork); const char* hostname = "m2m.eclipse.org"; int port = 1883; logMessage("Connecting to %s:%d\r\n", hostname, port); int rc = mqttNetwork.connect(hostname, port); logMessage("mqttNetwork.connect returned %d\r\n", rc); MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.MQTTVersion = 3; data.clientID.cstring = "mbed-sample"; data.username.cstring = "testuser"; data.password.cstring = "testpassword"; logMessage("MQTT connect...\r\n"); rc = client.connect(data); logMessage("rc from MQTT connect is %d\r\n", rc); logMessage("MQTT subscribe...\r\n"); rc = client.subscribe(topic, MQTT::QOS2, messageArrived); logMessage("rc from MQTT subscribe is %d\r\n", rc); sw2.fall(event_queue.event(&send_qos0, &client)); logMessage("press SW2 to send message\r\n"); wait_ms(osWaitForever); }
For some reason yield
is starting to return -1 after a few messages... Not sure if that's my network connection or something in the MQTT lib...
Regarding your error, you probably need to yield() more frequently. The server will disconnect your network connection after a brief period (2x keepalive timeout) if you don't yield() enough for MQTTClient to autosend the ping message.
posted by 11 Sep 2017Sadly, reverting to previous implementation of Countdown class clashes with latest mbedos release, since now I get following error when initializing Timer t with "t = Timer(); ":
[Error] Timer.h@50,0: #330-D: "mbed::NonCopyable<T> &mbed::NonCopyable<T>::operator=(const mbed::NonCopyable<T> &) [with T=mbed::Timer]" (declared at line 163 of "./mbed-os/platform/NonCopyable.h") is inaccessible
Any suggestions?
posted by 17 Sep 20177 years, 1 month ago.
The MQTTClient class is not thread safe in that it is not intended that multiple threads use it simultaneously. The MQTT library has however now been updated which might help solve your problem.