Had to fork with a different name, because of some incompatibility issues.

Dependencies:   MQTT

DeviceClient.cpp

Committer:
lamell
Date:
2020-09-14
Revision:
29:40cc05c6c14b
Parent:
27:3806829a0247

File content as of revision 29:40cc05c6c14b:

/*******************************************************************************
 * Copyright (c) 2015 IBM Corp.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Sathisumar Palaniappan - initial implementation
 *    Sathisumar Palaniappan - added reconnect logic and isConnected() method
 *    Lokesh K Haralakatta - Port to mbed OS 5 support
 *    Lokesh K Haralakatta - Added SSL/TLS Support
 *******************************************************************************/
#include "MQTTClient.h"
#include "DeviceClient.h"

Semaphore   semaIoT(3);

// need a wrapper since K64F and LPC1768 wont have the same name for mii read methods
#if defined(TARGET_UBLOX_C027) || defined(TARGET_K64F) || defined(TARGET_DISCO_F746NG)

static uint32_t linkStatus(void)
{
    return (1);
}
#elif defined(TARGET_LPC1768)
#include "lpc_phy.h"

static uint32_t linkStatus(void)
{
   return (lpc_mii_read_data() & 1);
}
#endif

using namespace IoTF;

CommandHandler handler = NULL;
void msgArrived(MQTT::MessageData& md);

/**
 * A client, used by device, that handles connections with the IBM Internet of Things Foundation.
 * This class allows device to publish events and receive commands to/from IBM IoT Foundation wtih simple function calls.
 */
DeviceClient::DeviceClient():org(NULL),deviceType(NULL),deviceId(NULL),
        authMethod(NULL),authToken(NULL),mqttNetwork(NULL),mqttClient(NULL),connected(false),port(0)
{
    semaIoT.try_acquire_for(500);
    debug("Constructor#1 called::\r\n");
    semaIoT.release();
}

DeviceClient::DeviceClient(char *orgId, char *typeId, char *id, int port):org(orgId),deviceType(typeId),
        deviceId(id),authMethod(NULL),authToken(NULL),connected(false), port(port)
{
    semaIoT.try_acquire_for(500);
    debug("Constructor#2 called:: org=%s, type=%s, id=%s\r\n", (org==NULL)?"NULL":org,
                    (deviceType==NULL)?"NULL":deviceType, (deviceId==NULL)?"NULL":deviceId);
    semaIoT.release();

    if(strcmp(this->org, QUICKSTART) != 0) {
        semaIoT.try_acquire_for(500);
        debug("Registered flow must provide valid token\r\n");
        semaIoT.release();
    }

    mqttNetwork = new MQTTNetwork();
    mqttClient = new MQTT::Client<MQTTNetwork, Countdown>(*mqttNetwork);

}

DeviceClient::DeviceClient(char *orgId, char *typeId,char *id, char *method, char *token, int port):org(orgId),
        deviceType(typeId),deviceId(id),authMethod(method),authToken(token),connected(false), port(port)
{
    // Don't print token for security reasons
    semaIoT.try_acquire_for(500);
    debug("Constructor#3 called:: org=%s, type=%s, id=%s\r\n", (org==NULL)?"NULL":org,
                    (deviceType==NULL)?"NULL":deviceType, (deviceId==NULL)?"NULL":deviceId);
    semaIoT.release();

    mqttNetwork = new MQTTNetwork();
    mqttClient = new MQTT::Client<MQTTNetwork, Countdown>(*mqttNetwork);
}

/**
 * Connect to the IBM Internet of Things Foundation
 */
bool DeviceClient::connect()
{
    char *organizationName, *typeId, *id;
    bool rc = false;
    // Check if any organization is set
    if(this->org == NULL || (strcmp("", this->org) == 0))
    {
        organizationName = (char*)QUICKSTART;
    } else {
        organizationName = this->org;
    }

    // Check if device type is already mentioned
    if(this->deviceType == NULL || (strcmp("", this->deviceType) == 0))
    {
        typeId = (char*)"iotsample-mbed";
    } else {
        typeId = this->deviceType;
    }

    char hostname[strlen(organizationName) + strlen(IBM_IOT_MESSAGING) + 1];
    sprintf(hostname, "%s%s", organizationName, IBM_IOT_MESSAGING);

    //NetworkInterface* net = mqttNetwork->getEth();
    NetworkInterface* net = mqttNetwork->network;
    //EthernetInterface  net = mqttNetwork->network;

    const char* ip = net->get_ip_address();

    // Get devices MAC address if deviceId is not set already
    if(this->deviceId == NULL || (strcmp("", this->deviceId) == 0))
    {
        char tmpBuf[50];
        id = getMac(tmpBuf, sizeof(tmpBuf));
    } else {
        id = this->deviceId;
    }

    // Construct clientId - d:org:type:id
    char clientId[strlen(organizationName) + strlen(typeId) + strlen(id) + 5];
    sprintf(clientId, "d:%s:%s:%s", organizationName, typeId, id);

    // Initialize MQTT Connect
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = 4;
    data.clientID.cstring = clientId;

    int quickstartMode = (strcmp(organizationName, QUICKSTART) == 0);

    if (!quickstartMode)
    {
        data.username.cstring = (char*)"use-token-auth";
        data.password.cstring = this->authToken;

        //Check and initialize appropriate port
        if(port == 1883)
            port = MQTT_TLS_PORT;
    }

    logData(net, hostname, clientId);

    if(ip){
       rc = tryConnect(hostname, data);
       // By default subscribe to commands if we are in registered flow
       if(rc == true && !quickstartMode)
       {
           subscribeToCommands();
       }
       if(rc == true)
       {
          connected = true;
          semaIoT.try_acquire_for(500);
          debug("Device Client Connected to %s:%d\r\n",hostname,port);
          semaIoT.release();
        }
    }
    else
       semaIoT.try_acquire_for(500);
       debug("No IP Assigned to Network Interface...\r\n");
       semaIoT.release();

    return rc;
}

/**
 * Reconnect when the connection is lost. This method disconnects the active connection if any
 * and tries to initiate a fresh connection.
 * This method uses the Ethernet Link status wherever applicable while reconnecting. i.e, tries to
 * initiate the connection only when the Ethernet cable is plugged in.
 */
bool DeviceClient::reConnect()
{
    semaIoT.try_acquire_for(500);
    debug("DeviceClient::reConnect() entry and connected = %s\r\n",(connected == true)?"true":"false");
    semaIoT.release();

    if(connected == true)
    {
        disconnect();
    }

    if(linkStatus())
    {
        //NetworkInterface* net = mqttNetwork->getEth();
        NetworkInterface* net = mqttNetwork->network;
        //EthernetInterface  net = mqttNetwork->network;

        if(net->connect() == 0)
        {
            bool status = connect();
            if(status == false)
            {
                net->disconnect();
            }
            return status;
        }
    }
    return false;
}

bool DeviceClient::tryConnect(char *hostname, MQTTPacket_connectData &data)
{
    int rc = -1;
    int retryAttempt = 0;
    do {
        semaIoT.try_acquire_for(500);
        debug("%d\r\n",rc = mqttNetwork->connect(hostname, port));
        semaIoT.release();

        if (rc != 0)
        {
            semaIoT.try_acquire_for(500);
            debug("mqttNetwork connect returned: %d\r\n", rc);
            semaIoT.release();
        }

        // MQTT connect
        semaIoT.try_acquire_for(500);
        debug("%d\r\n",rc = mqttClient->connect(data));
        semaIoT.release();

        if (rc == 0 && (rc) != 0)
        {
            semaIoT.try_acquire_for(500);
            debug("MQTT connect returned %d\r\n", rc);
            semaIoT.release();

            if (rc == MQTT_NOT_AUTHORIZED || rc == MQTT_BAD_USERNAME_OR_PASSWORD)
                return false; // don't reattempt to connect if credentials are wrong
        } else if (rc == MQTT_CONNECTION_ACCEPTED) {
            return true;
        }

        int timeout = getConnTimeout(++retryAttempt);

        semaIoT.try_acquire_for(500);
        debug("Retry attempt number %d waiting %d\r\n", retryAttempt, timeout);
        semaIoT.release();

        // enough retry is done - return to application
        if (retryAttempt == 5){
        
            //Here's my modification. If not connected, RESET the board.
            NVIC_SystemReset();
            return false;
        } else {
            wait(timeout);
        }
    } while(true);
}

void DeviceClient::logData(NetworkInterface* net, char *hostname, char *clientId)
//void DeviceClient::logData(EthernetInterface net, char *hostname, char *clientId)
{
    // Network debug statements
    semaIoT.try_acquire_for(500);
    debug("=====================================\r\n");
    debug("Connection Config Details:\r\n");
    debug("IP ADDRESS: %s\r\n", net->get_ip_address());
    debug("MAC ADDRESS: %s\r\n", net->get_mac_address());
    debug("Gateway: %s\r\n", net->get_gateway());
    debug("Network Mask: %s\r\n", net->get_netmask());
    debug("Server Hostname: %s\r\n", hostname);
    debug("Server Port: %d\r\n", port);
    debug("Client ID: %s\r\n", clientId);
    debug("=====================================\r\n");
    semaIoT.release();
}

int DeviceClient::getConnTimeout(int attemptNumber)
{
    // Try to increase the timeout every time
    return (attemptNumber * attemptNumber * 5);
}


/**
 * Returns the connection status, connected or disconnected.
 */
bool DeviceClient::isConnected() {
    return mqttClient->isConnected();
}

/**
 * Publish data to the IBM Internet of Things Foundation. Note that data is published
 * by default at Quality of Service (QoS) 0, which means that a successful send
 * does not guarantee receipt even if the publish has been successful.
 */
bool DeviceClient::publishEvent(char *eventName, char *data, MQTT::QoS qos)
{
    if(!mqttClient->isConnected())
    {
        semaIoT.try_acquire_for(500);
        debug("Client is not connected \r\n");
        semaIoT.release();

        return false;
    }

    MQTT::Message message;
    /* Topic format must be iot-2/evt/<eventName>/fmt/json (let us stick to json format for now)
     *
     * So length must be 10 + strlen(eventName) + 9 + 1
     * iot-2/evt/ = 10
     * /fmt/json = 9
     * NULL char = 1
     */

    char topic[10 + strlen(eventName) + 9 + 1];
    sprintf(topic, "%s%s%s", "iot-2/evt/", eventName, "/fmt/json");

    message.qos = qos;
    message.retained = false;
    message.dup = false;
    message.payload = (void*)data;
    message.payloadlen = strlen(data);

    semaIoT.try_acquire_for(500);
    debug("Publishing %s\r\n", data);
    semaIoT.release();

    int rc = mqttClient->publish(topic, message);
    return rc == 0;
}

void DeviceClient::setCommandCallback(CommandHandler callbackFunc)
{
    handler = callbackFunc;
}
/**
 * Subscribe to commands from the application. This will be executed only for
 * registered flow (quickstart flow does not support command publish)
 */
int DeviceClient::subscribeToCommands()
{
    int rc = 0;
    // iot-2/cmd/+/fmt/+
    if ((rc = mqttClient->subscribe("iot-2/cmd/+/fmt/+", MQTT::QOS2, msgArrived)) != 0)
        semaIoT.try_acquire_for(500);
        debug("rc from MQTT subscribe is %d\r\n", rc);
        semaIoT.release();
    return rc;
}

/**
 * Callback method to be registered with MQTT::Client. MQTT::Client calls whenever
 * any command is published to the topic subscribed earlier.
 */
void msgArrived(MQTT::MessageData& md)
{
    // check whether callback is registered by the client code
    if(handler == NULL)
    {
        return;
    }

    MQTT::Message &message = md.message;
    char topic[md.topicName.lenstring.len + 1];

    sprintf(topic, "%.*s", md.topicName.lenstring.len, md.topicName.lenstring.data);

    semaIoT.try_acquire_for(500);
    debug("Message arrived on topic %s: Length: %ul. Payload: %s\r\n",  topic, message.payloadlen, message.payload);
    semaIoT.release();

    // Command topic: iot-2/cmd/blink/fmt/json - cmd is the string between cmd/ and /fmt/
    char* start = strstr(topic, "/cmd/") + 5;
    int len = strstr(topic, "/fmt/") - start;

    char name[len + 1];

    memcpy(name, start, len);
    name[len] = NULL;

    start = strstr(topic, "/fmt/") + 5;

    char format[20];  // ToDO: need to find the length of the format
    strcpy(format, start);

    char payload[message.payloadlen + 1];
    sprintf(payload, "%.*s", message.payloadlen, (char*)message.payload);

    IoTF::Command cmd(name, format, payload);
    (*handler)(cmd);
}

/**
 * Disconnects the connection in order.
 */
bool DeviceClient::disconnect()
{
    int rc = 0;
    if(mqttClient->isConnected())
    {
        rc = mqttClient->disconnect();
    }

    //NetworkInterface* net = mqttNetwork->getEth();
    NetworkInterface* net = mqttNetwork->network;
    //EthernetInterface  net = mqttNetwork->network;

    mqttNetwork->disconnect();
    net->disconnect();
    connected = false;
    return rc == 0;
}

// Yield to allow MQTT client to process the command
void DeviceClient::yield(int ms)
{
    if(mqttClient->isConnected())
    {
        mqttClient->yield(ms);
    }
}

// Obtain DeviceId address
char* DeviceClient::getDeviceId(char* buf, int buflen)
{
    if(this->deviceId == NULL || (strcmp("", this->deviceId) == 0))
    {
        return getMac(buf, buflen);
    } else {
        return strncpy(buf, this->deviceId, buflen);
    }
}
// Obtain MAC address
char* DeviceClient::getMac(char* buf, int buflen)
{
    //NetworkInterface* net = mqttNetwork->getEth();
    NetworkInterface* net = mqttNetwork->network;
    //EthernetInterface  net = mqttNetwork->network;

    strncpy(buf, net->get_mac_address(), buflen);

    char* pos;                                                 // Remove colons from mac address
    while ((pos = strchr(buf, ':')) != NULL)
        memmove(pos, pos + 1, strlen(pos) + 1);
    return buf;
}

char* DeviceClient::ipaddress() {
    //char iplocal[25];
    
    //NetworkInterface* net = mqttNetwork->getEth();
    NetworkInterface* net = mqttNetwork->network;
    //EthernetInterface  net = mqttNetwork->network;

    const char* ip = net->get_ip_address();
    
    //strcpy(iplocal,ip);
    //return iplocal;
    return (char*)net->get_ip_address();
}

//NetworkInterface* DeviceClient::eth() {
//    NetworkInterface* net = mqttNetwork->getEth();
//    return  (NetworkInterface*)net;
//}