Example node for Yodiwo's Plegma API

Dependencies:   EthernetInterface FXOS8700Q HTTPClient HTTPD MQTTS SDFileSystem YodiwoPlegma mbed-rpc mbed-rtos mbed wolfSSL

mqtt_helpers.cpp

Committer:
mitsarionas
Date:
2015-09-28
Revision:
8:66d34592c1ad
Parent:
5:1ef168357347

File content as of revision 8:66d34592c1ad:

#include "mbed.h"
#include "rtos.h"

#include <stdint.h>

#include "MQTTClient.h"
#include "MQTTSocket.h"

#include "mqtt_helpers.h"
#include "yodiwo_functions.h"
#include "jsmn.h"
#include "yodiwo_helpers.h"

///////////////////////////////// NETWORK
//MQTTEthernet ipstack;
MQTTSocket ipstack;
MQTT::Client<MQTTSocket, Countdown, MAX_MSG_LEN> *client;

///////////////////////////////// MQTT
Thread *mqtt_thread;
void mqtt_thread_func(void const *args);

char mqtt_topic_sub[MAX_TOPIC_LEN];
char mqtt_topic_pub[MAX_TOPIC_LEN];


int mqtt_init(char *hostname, int port, char *certfile, char *nodeKeyS, char *nodeSecret)
{   
//    initialize_things(nodeKeyS); 
    Yodiwo_Plegma_NodeKey_t nodeKey;
    NodeKey_FromString(&nodeKey, nodeKeyS);
    sprintf(mqtt_topic_sub, "/api/out/" YODIWO_API_VERSION_STR "/%s/#", nodeKeyS);
    sprintf(mqtt_topic_pub, "/api/in/" YODIWO_API_VERSION_STR "/%s/%s/", nodeKey.UserKey.UserID, nodeKeyS);
    printf("topic to subscribe: %s\n", mqtt_topic_sub);
    
    client = new MQTT::Client<MQTTSocket, Countdown, MAX_MSG_LEN>(ipstack);
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = 4;
    data.clientID.cstring = nodeKeyS;
    data.username.cstring = nodeKeyS;
    data.password.cstring = nodeSecret;
    ipstack = MQTTSocket();  
    
    printf("connecting to MQTT broker: %s:%d\n", hostname, port);    
    
    int rc = ipstack.connect(hostname, port, port > 8000 ? certfile : NULL);
    if (rc != 0)
        printf("rc from TCP connect is %d\n", rc);
 
    if ((rc = client->connect(data)) != 0)
        printf("rc from MQTT connect is %d\n", rc);
    printf("MQTT connected\n") ;
    if ((rc = client->subscribe(mqtt_topic_sub, MQTT::QOS0, on_mqtt_message)) != 0)
        printf("rc from MQTT subscribe is %d\n", rc);
    printf("Subscribed\n") ;
    if (rc != 0) {
        printf("MQTT init failed: %d\n", rc);
        return rc;
    }
    mqtt_thread = new Thread(mqtt_thread_func, (void *)client, osPriorityNormal, 10000);
    return 0;
}

void mqtt_thread_func(void const *args)
{
    MQTT::Client<MQTTSocket, Countdown, MAX_MSG_LEN> *client = (MQTT::Client<MQTTSocket, Countdown, MAX_MSG_LEN>*)args;
    while(1)
        client->yield(100);
}


void on_mqtt_message(MQTT::MessageData &md)
{
    MQTT::Message &message = md.message;

    printf("%.*s\n", md.topicName.lenstring.len, md.topicName.lenstring.data);
    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
    printf("Payload %.*s\n", message.payloadlen, (char*)message.payload);
    
    yodiwo_handle_message((char *)message.payload, message.payloadlen, md.topicName.lenstring.data, md.topicName.lenstring.len);
}

int publisher(char *msg, int msg_len, char *msg_type)
{
    int r;
    char topic[MAX_TOPIC_LEN];
    MQTT::Message message;
    message.retained = false;
    message.dup = false;
    message.payload = (void*)msg;
    message.qos = MQTT::QOS0;
    message.payloadlen = msg_len - 1;
    
    strcpy(topic, mqtt_topic_pub);
    strcat(topic, msg_type);
    printf("publishing to %s\n", topic);
    printf("content: %.*s\n", msg_len, msg);
    printf("length: %d\n", msg_len);
    r = client->publish(topic, message);
    printf("publish returned %d\n", r);
    return r;
}