Had to fork with a different name, because of some incompatibility issues.
Diff: DeviceClient.cpp
- Revision:
- 0:f86732d81998
- Child:
- 1:31c93319bbd8
- Child:
- 2:199ddea804cd
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/DeviceClient.cpp Fri Nov 06 07:09:14 2015 +0000 @@ -0,0 +1,296 @@ +/******************************************************************************* + * Copyright (c) 2015 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Sathisumar Palaniappan - initial implementation + * + *******************************************************************************/ + +#include "MQTTClient.h" +#include "DeviceClient.h" + +using namespace IoTF; + +CommandHandler handler = NULL; +void msgArrived(MQTT::MessageData& md); + +/** + * A client, used by device, that handles connections with the IBM Internet of Things Foundation. + * This class allows device to publish events and receive commands to/from IBM IoT Foundation wtih simple function calls. + */ +DeviceClient::DeviceClient():mqttClient(ipstack) { + LOG("Constructor#1 called::\n"); + this->org = NULL; + this->deviceType = NULL; + this->deviceId = NULL; + this->authMethod = NULL; + this->authToken = NULL; +} + +DeviceClient::DeviceClient(char *org, char *deviceType, char *deviceId): + mqttClient(ipstack) { + + LOG("Constructor#2 called:: org=%s, type=%s, id=%s", (org==NULL)?"NULL":org, + (deviceType==NULL)?"NULL":deviceType, (deviceId==NULL)?"NULL":deviceId); + + this->org = org; + this->deviceType = deviceType; + this->deviceId = deviceId; + this->authMethod = NULL; + this->authToken = NULL; + + if(strcmp(this->org, QUICKSTART) != 0) { + WARN("Registered flow must provide valid token"); + } +} + +DeviceClient::DeviceClient(char *org, char *deviceType, + char *deviceId, char *authMethod, char *authToken): + mqttClient(ipstack) { + + // Don't print token for security reasons + LOG("Constructor#3 called:: org=%s, type=%s, id=%s", (org==NULL)?"NULL":org, + (deviceType==NULL)?"NULL":deviceType, (deviceId==NULL)?"NULL":deviceId); + this->org = org; + this->deviceType = deviceType; + this->deviceId = deviceId; + this->authMethod = authMethod; + this->authToken = authToken; +} + +/** + * Connect to the IBM Internet of Things Foundation + */ +bool DeviceClient::connect() +{ + char *organizationName, *typeId, *id; + // Check if any organization is set + if(this->org == NULL) { + organizationName = QUICKSTART; + } else { + organizationName = this->org; + } + + // Check if device type is already mentioned + if(this->deviceType == NULL) { + typeId = "iotsample-mbed"; + } else { + typeId = this->deviceType; + } + + char hostname[strlen(organizationName) + strlen(IBM_IOT_MESSAGING) + 1]; + sprintf(hostname, "%s%s", organizationName, IBM_IOT_MESSAGING); + + EthernetInterface& eth = ipstack.getEth(); + + // Get devices MAC address if deviceId is not set already + if(this->deviceId == NULL || (strcmp("", this->deviceId) == 0)) { + char tmpBuf[50]; + id = getMac(tmpBuf, sizeof(tmpBuf)); + } else { + id = this->deviceId; + } + + // Construct clientId - d:org:type:id + char clientId[strlen(organizationName) + strlen(typeId) + strlen(id) + 5]; + sprintf(clientId, "d:%s:%s:%s", organizationName, typeId, id); + + logData(eth, hostname, clientId); + + // Initialize MQTT Connect + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + data.MQTTVersion = 4; + data.clientID.cstring = clientId; + + int quickstartMode = (strcmp(organizationName, QUICKSTART) == 0); + + if (!quickstartMode) { + data.username.cstring = "use-token-auth"; + data.password.cstring = this->authToken; + } + bool rc = tryConnect(hostname, data); + // By default subscribe to commands if we are in registered flow + if(rc == true && !quickstartMode) { + subscribeToCommands(); + } + return rc; +} + +bool DeviceClient::tryConnect(char *hostname, MQTTPacket_connectData &data) { + int rc = -1; + int retryAttempt = 0; + do { + rc = ipstack.connect(hostname, IBM_IOT_PORT, CONNECT_TIMEOUT); + if (rc != 0) { + WARN("IP Stack connect returned: %d\n", rc); + } + + // MQTT connect + if (rc == 0 && (rc = mqttClient.connect(data)) != 0) { + WARN("MQTT connect returned %d\n", rc); + if (rc == MQTT_NOT_AUTHORIZED || rc == MQTT_BAD_USERNAME_OR_PASSWORD) + return false; // don't reattempt to connect if credentials are wrong + } else if (rc == MQTT_CONNECTION_ACCEPTED) { + return true; + } + + int timeout = getConnTimeout(++retryAttempt); + + WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout); + + // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed + // or maybe just add the proper members to do this disconnect and call attemptConnect(...) + + // this works - reset the system when the retry count gets to a threshold + if (retryAttempt == 5) + NVIC_SystemReset(); + else + wait(timeout); + } while(true); +} + +void DeviceClient::logData(EthernetInterface& eth, char *hostname, char *clientId) { + // Network debug statements + LOG("=====================================\n"); + LOG("Connecting Ethernet.\n"); + LOG("IP ADDRESS: %s\n", eth.getIPAddress()); + LOG("MAC ADDRESS: %s\n", eth.getMACAddress()); + LOG("Gateway: %s\n", eth.getGateway()); + LOG("Network Mask: %s\n", eth.getNetworkMask()); + LOG("Server Hostname: %s\n", hostname); + LOG("Client ID: %s\n", clientId); + LOG("=====================================\n"); +} + +int DeviceClient::getConnTimeout(int attemptNumber) +{ + // Try to increase the timeout every time + return (attemptNumber * attemptNumber * 5); +} + + +/** + * Publish data to the IBM Internet of Things Foundation. Note that data is published + * by default at Quality of Service (QoS) 0, which means that a successful send + * does not guarantee receipt even if the publish has been successful. + */ +bool DeviceClient::publishEvent(char *eventName, char *data, MQTT::QoS qos) +{ + if(!mqttClient.isConnected()) { + WARN("Client is not connected \n"); + return false; + } + + MQTT::Message message; + /* Topic format must be iot-2/evt/<eventName>/fmt/json (let us stick to json format for now) + * + * So length must be 10 + strlen(eventName) + 9 + 1 + * iot-2/evt/ = 10 + * /fmt/json = 9 + * NULL char = 1 + */ + + char topic[10 + strlen(eventName) + 9 + 1]; + sprintf(topic, "%s%s%s", "iot-2/evt/", eventName, "/fmt/json"); + + message.qos = qos; + message.retained = false; + message.dup = false; + message.payload = (void*)data; + message.payloadlen = strlen(data); + + LOG("Publishing %s\n", data); + int rc = mqttClient.publish(topic, message); + mqttClient.yield(10); + return rc == 0; +} + +void DeviceClient::setCommandCallback(CommandHandler callbackFunc) { + handler = callbackFunc; +} +/** + * Subscribe to commands from the application. This will be executed only for + * registered flow (quickstart flow does not support command publish) + */ +int DeviceClient::subscribeToCommands() { + int rc = 0; + // iot-2/cmd/+/fmt/+ + if ((rc = mqttClient.subscribe("iot-2/cmd/+/fmt/+", MQTT::QOS2, msgArrived)) != 0) + WARN("rc from MQTT subscribe is %d\n", rc); + return rc; +} + +/** + * Callback method to be registered with MQTT::Client. MQTT::Client calls whenever + * any command is published to the topic subscribed earlier. + */ +void msgArrived(MQTT::MessageData& md) +{ + // check whether callback is registered by the client code + if(handler == NULL) { + return; + } + + MQTT::Message &message = md.message; + char topic[md.topicName.lenstring.len + 1]; + + sprintf(topic, "%.*s", md.topicName.lenstring.len, md.topicName.lenstring.data); + + LOG("Message arrived on topic %s: %.*s\n", topic, message.payloadlen, message.payload); + + // Command topic: iot-2/cmd/blink/fmt/json - cmd is the string between cmd/ and /fmt/ + char* start = strstr(topic, "/cmd/") + 5; + int len = strstr(topic, "/fmt/") - start; + + char name[len + 1]; + + memcpy(name, start, len); + name[len] = NULL; + + start = strstr(topic, "/fmt/") + 5; + + char format[20]; // ToDO: need to find the length of the format + strcpy(format, start); + + char payload[message.payloadlen + 1]; + sprintf(payload, "%.*s", message.payloadlen, (char*)message.payload); + + IoTF::Command cmd(name, format, payload); + (*handler)(cmd); +} + +bool DeviceClient::disconnect() { + if(mqttClient.isConnected()) { + int rc = mqttClient.disconnect(); + return rc == 0; + } + return false; +} + +// Yield to allow MQTT client to process the command +void DeviceClient::yield(int ms) { + if(mqttClient.isConnected()) { + mqttClient.yield(ms); + } +} + +// Obtain MAC address +char* DeviceClient::getMac(char* buf, int buflen) +{ + EthernetInterface& eth = ipstack.getEth(); + strncpy(buf, eth.getMACAddress(), buflen); + + char* pos; // Remove colons from mac address + while ((pos = strchr(buf, ':')) != NULL) + memmove(pos, pos + 1, strlen(pos) + 1); + return buf; +} \ No newline at end of file