6 years, 9 months ago.

MQTT library yield function not working properly

I am working with stm32 discovery board.I have written this code to connect to a MQTT client.In the super loop, I want to check if there are any characters available to read from the serial terminal and publish the same.In addition to that, I am calling yield function to keep the connection alive with a timeout value in milliseconds.But the yield function is not working as documented.With the parameter set to 1L, it should be waiting for one millisecond but instead, it waits for a very large period of time when it is called for the first time.And after that, the code keeps on printing first and second continuously, that is, the delay gets reduced to a very small value.Please help me out to crack this strange behavior of yield function as I have tried everything but still stuck here.

Here is the code:

#include "easy-connect.h"
#include "MQTTNetwork.h"
#include "MQTTmbed.h"
#include "MQTTClient.h"
float version = 0.6;
int arrivedcount = 0,rc;
char buffer[50];
char buffer1[50];
char c;
bool flag;
MQTT::Message message;
NetworkInterface* ipstack = easy_connect(true); 
 
MQTTNetwork mqttnetwork(ipstack);
MQTT::Client<MQTTNetwork, Countdown> client = MQTT::Client<MQTTNetwork, Countdown>(mqttnetwork);

volatile int received=0;
Serial pc(USBTX,USBRX);
void messageArrived(MQTT::MessageData& md)
{
    MQTT::Message &message = md.message;
    pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d \n", message.qos, message.retained, message.dup, message.id);
    pc.printf("Payload %.*s\n", message.payloadlen, (char*)message.payload);
   
}

int main()
{
   //.attach(callback_ex);
    pc.printf("HelloMQTT: version is %.2f \n", version);
    char* hostname = "m12.cloudmqtt.com";
    int port =16479;
    pc.printf("Connecting to %s:%d\r\n", hostname, port);
    int rc = mqttnetwork.connect(hostname, port);
    if (rc != 0)
        pc.printf("rc from TCP connect is %d \n", rc);

    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = 3;
    data.clientID.cstring = "1";
    data.username.cstring = "sxdzesyk";
    data.password.cstring = "dc_pY7Q7gOTw";
    rc = client.connect(data);
   wait(5);
    pc.printf("rc from MQTT connect is %d \n", rc);

  rc = client.subscribe("testing", MQTT::QOS0,messageArrived);
  wait(3);
  pc.printf("rc from MQTT subscribe is %d \n", rc);
  sprintf(buffer1,"text message");
  message.qos = MQTT::QOS1;
    message.retained = false;
    message.dup = false;
    message.payload = (void*)(buffer1);
    message.payloadlen = strlen(buffer1)+1;
    rc = client.publish("testout", message);  

  

while(1)
{
pc.printf("first");
client.yield(1000L);
pc.printf("second");
if(pc.readable())
{
 pc.printf("character found");
 c=pc.getc();
 buffer[received++]=c;
  pc.putc(c);
  message.qos = MQTT::QOS1;
    message.retained = false;
    message.dup = false;
    message.payload = (void*)buffer;
    message.payloadlen = strlen(buffer)+1;
    rc = client.publish("testout", message);      
    received=0;
}
}
}

2 Answers

6 years, 2 months ago.

I have the same problem on a Nucleo-F746ZG, using Ethernet. The client.yield(100); times out after about 90 seconds. Updating MQTT and mbed-os, did not help. What can be the problem?

On the mbed console, I read:

[EasyConnect] Using Ethernet
[EasyConnect] Connected to Network successfully
[EasyConnect] IP address 192.168.0.102
Connecting to 192.168.0.101:1883
Connected!
Topic   : mbed-sample
Username: theuy
Password: theuy_mqtt
Waiting for client.yield(100) to time-out after about 90 seconds...
Message arrived: qos 0, retained 0, dup 0, packetid 57004
Payload: Hello World! (QoS0)
Arrived count: 1
QOS0: arrivedcount: 1
Failed: rc from unsubscribe was -1
Failed: rc from disconnect was -1
Finish 1 msgs

On Ubuntu, I use Mosquitto MQTT Broker/Server. On the terminal I read:

$ mosquitto_sub -v -t "mbed-sample" -u "user" -P "passwd"
mbed-sample Hello World! (QoS0)

The code is largely the same as HelloMQTT, some messages added, some shortened.

/*******************************************************************************
 * Copyright (c) 2014, 2015 IBM Corp.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Ian Craggs - initial API and implementation and/or initial documentation
 *    Ian Craggs - make sure QoS2 processing works, and add device headers
 *******************************************************************************/

 /**
  This is a sample program to illustrate the use of the MQTT Client library
  on the mbed platform.  The Client class requires two classes which mediate
  access to system interfaces for networking and timing.  As long as these two
  classes provide the required public programming interfaces, it does not matter
  what facilities they use underneath. In this program, they use the mbed
  system libraries.

 */

//#define MQTTCLIENT_QOS2 1
#define BAUD_RATE       230400

#include "easy-connect.h"
#include "MQTTNetwork.h"
#include "MQTTmbed.h"
#include "MQTTClient.h"

int arrivedcount = 0;

Serial      pc(USBTX, USBRX);

void messageArrived(MQTT::MessageData& md)
{
    MQTT::Message &message = md.message;
    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
    printf("Payload: %.*s\n", message.payloadlen, (char*)message.payload);
    arrivedcount++;
    printf("Arrived count: %d\n", arrivedcount);
}

int main(int argc, char* argv[])
{
    pc.baud(BAUD_RATE);

    printf("\n");
    printf("---------------------------\n");
    printf("Starting HelloMQTT\n");
    printf("---------------------------\n");

    NetworkInterface* network = easy_connect(true);
    if (!network) {
        return -1;
    }

    MQTTNetwork mqttNetwork(network);

    MQTT::Client<MQTTNetwork, Countdown> client(mqttNetwork);

    // Connection details
//    const char* hostname = "m2m.eclipse.org";
    const char* hostname = "192.168.0.101";
    int port = 1883;
    
    printf("Connecting to %s:%d\n", hostname, port);
    
    int rc = mqttNetwork.connect(hostname, port);
    if (rc != 0) {
        printf("rc from TCP connect is %d\n", rc);
    }
    else {
        printf("Connected!\n");
    }

    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = 3;
    
    // MQTT Username, Password
    data.clientID.cstring = "mbed-sampl-nothing?";
    data.username.cstring = "user";
    data.password.cstring = "passwd";

    // MQTT Topic
    char* topic = "mbed-sample";

    printf("Topic   : %s\n", topic);
    printf("Username: %s\n", data.username.cstring);
    printf("Password: %s\n", data.password.cstring);
    
    if ((rc = client.connect(data)) != 0) {
        printf("Failed: rc from MQTT connect is %d\n", rc);
    }
    if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0) {
        printf("Failed: rc from MQTT subscribe is %d\n", rc);
    }

    MQTT::Message message;

    // QoS 0
    char buf[100];
    sprintf(buf, "Hello World! (QoS0)");
    message.qos = MQTT::QOS0;
    message.retained = false;
    message.dup = false;
    message.payload = (void*)buf;
    message.payloadlen = strlen(buf)+1;
    rc = client.publish(topic, message);
    while (arrivedcount < 1) {
        printf("Waiting for client.yield(100) to time-out after about 90 seconds...\n");
        client.yield(100);  // Times out after about 90 seconds
        printf("QOS0: arrivedcount: %d\n", arrivedcount);
    }

    if ((rc = client.unsubscribe(topic)) != 0) {
        printf("Failed: rc from unsubscribe was %d\n", rc);
    }
    if ((rc = client.disconnect()) != 0) {
        printf("Failed: rc from disconnect was %d\n", rc);
    }

    mqttNetwork.disconnect();

    printf("Finish %d msgs\n", arrivedcount);

    return 0;
}

A quick look says it might be the socket was opened in blocking mode so it blocks in MQTTSocket.h. The timers are not used and if it hasn't been set to non blocking very bad is going to happen. int read(unsigned char* buffer, int len, int timeout) { return mysock.recv((char*)buffer, len); }

posted by marian szczepkowski 17 Feb 2018
6 years, 2 months ago.

Changing the code in MQTTSocket.h appears to have fixed this problem for me. All the timeouts are ignored so I figured it's safe to set the timeout back to zero.

MQTTSocket.h

Change this 
int read(unsigned char* buffer, int len, int timeout) 
{ 
    return mysock.recv((char*)buffer, len); 
}

To this
int read(unsigned char* buffer, int len, int timeout)
{
        mysock.set_timeout(timeout);
        int rc = mysock.recv((char*)buffer, len);
        mysock.set_timeout(0);
        return rc;
}