Demo application for using the AT&T IoT Starter Kit Powered by AWS.
Dependencies: SDFileSystem
Fork of ATT_AWS_IoT_demo by
IoT Starter Kit Powered by AWS Demo
This program demonstrates the AT&T IoT Starter Kit sending data directly into AWS IoT. It's explained and used in the Getting Started with the IoT Starter Kit Powered by AWS on starterkit.att.com.
What's required
- AT&T IoT LTE Add-on (also known as the Cellular Shield)
- NXP K64F - for programming
- microSD card - used to store your AWS security credentials
- AWS account
- Python, locally installed
If you don't already have an IoT Starter Kit, you can purchase a kit here. The IoT Starter Kit Powered by AWS includes the LTE cellular shield, K64F, and a microSD card.
Diff: AWS_openssl/aws_iot_src/shadow/aws_iot_shadow_records.cpp
- Revision:
- 15:6f2798e45099
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/AWS_openssl/aws_iot_src/shadow/aws_iot_shadow_records.cpp Thu Dec 01 18:05:38 2016 +0000 @@ -0,0 +1,486 @@ +/* + * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include "aws_iot_shadow_records.h" + +#include <string.h> +#include <stdio.h> + +#include "timer_interface.h" +#include "aws_iot_json_utils.h" +#include "aws_iot_log.h" +#include "aws_iot_shadow_json.h" +#include "aws_iot_config.h" + +typedef struct { + char clientTokenID[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE]; + char thingName[MAX_SIZE_OF_THING_NAME]; + ShadowActions_t action; + fpActionCallback_t callback; + void *pCallbackContext; + bool isFree; + Timer timer; +} ToBeReceivedAckRecord_t; + +typedef struct { + const char *pKey; + void *pStruct; + jsonStructCallback_t callback; + bool isFree; +} JsonTokenTable_t; + +typedef struct { + char Topic[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + uint8_t count; + bool isFree; + bool isSticky; +} SubscriptionRecord_t; + +typedef enum { + SHADOW_ACCEPTED, SHADOW_REJECTED, SHADOW_ACTION +} ShadowAckTopicTypes_t; + +ToBeReceivedAckRecord_t AckWaitList[MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME]; + +MQTTClient_t *pMqttClient; + +char myThingName[MAX_SIZE_OF_THING_NAME]; +char mqttClientID[MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES]; + +char shadowDeltaTopic[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + +#define MAX_TOPICS_AT_ANY_GIVEN_TIME 2*MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME +SubscriptionRecord_t SubscriptionList[MAX_TOPICS_AT_ANY_GIVEN_TIME]; + +#define SUBSCRIBE_SETTLING_TIME 2 +char shadowRxBuf[SHADOW_MAX_SIZE_OF_RX_BUFFER]; + +static JsonTokenTable_t tokenTable[MAX_JSON_TOKEN_EXPECTED]; +static uint32_t tokenTableIndex = 0; +static bool deltaTopicSubscribedFlag = false; +uint32_t shadowJsonVersionNum = 0; +bool shadowDiscardOldDeltaFlag = true; + +// local helper functions +static int AckStatusCallback(MQTTCallbackParams params); +static int shadow_delta_callback(MQTTCallbackParams params); +static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action, + ShadowAckTopicTypes_t ackType); +static int16_t getNextFreeIndexOfSubscriptionList(void); +static void unsubscribeFromAcceptedAndRejected(uint8_t index); + +void initDeltaTokens(void) { + uint32_t i; + for (i = 0; i < MAX_JSON_TOKEN_EXPECTED; i++) { + tokenTable[i].isFree = true; + } + tokenTableIndex = 0; + deltaTopicSubscribedFlag = false; +} + +IoT_Error_t registerJsonTokenOnDelta(jsonStruct_t *pStruct) { + + IoT_Error_t rc = NONE_ERROR; + + if (!deltaTopicSubscribedFlag) { + MQTTSubscribeParams subParams; + subParams.mHandler = shadow_delta_callback; + snprintf(shadowDeltaTopic,MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/update/delta", myThingName); + subParams.pTopic = shadowDeltaTopic; + subParams.qos = QOS_0; + rc = pMqttClient->subscribe(&subParams); + DEBUG("delta topic %s", shadowDeltaTopic); + deltaTopicSubscribedFlag = true; + } + + if (tokenTableIndex >= MAX_JSON_TOKEN_EXPECTED) { + return GENERIC_ERROR; + } + + tokenTable[tokenTableIndex].pKey = pStruct->pKey; + tokenTable[tokenTableIndex].callback = pStruct->cb; + tokenTable[tokenTableIndex].pStruct = pStruct; + tokenTable[tokenTableIndex].isFree = false; + tokenTableIndex++; + + return rc; +} + +static int16_t getNextFreeIndexOfSubscriptionList(void) { + uint8_t i; + for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { + if (SubscriptionList[i].isFree) { + SubscriptionList[i].isFree = false; + return i; + } + } + return -1; +} + +static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action, + ShadowAckTopicTypes_t ackType) { + + char actionBuf[10]; + char ackTypeBuf[10]; + + if (action == SHADOW_GET) { + strcpy(actionBuf, "get"); + } else if (action == SHADOW_UPDATE) { + strcpy(actionBuf, "update"); + } else if (action == SHADOW_DELETE) { + strcpy(actionBuf, "delete"); + } + + if (ackType == SHADOW_ACCEPTED) { + strcpy(ackTypeBuf, "accepted"); + } else if (ackType == SHADOW_REJECTED) { + strcpy(ackTypeBuf, "rejected"); + } + + if (ackType == SHADOW_ACTION) { + sprintf(pTopic, "$aws/things/%s/shadow/%s", pThingName, actionBuf); + } else { + sprintf(pTopic, "$aws/things/%s/shadow/%s/%s", pThingName, actionBuf, ackTypeBuf); + } +} + +static bool isAckForMyThingName(const char *pTopicName) { + if (strstr(pTopicName, myThingName) != NULL && ((strstr(pTopicName, "get/accepted") != NULL) || (strstr(pTopicName, "delta") != NULL))) { + return true; + } + return false; +} + +static int AckStatusCallback(MQTTCallbackParams params) { + int32_t tokenCount; + int32_t i; + void *pJsonHandler; + char temporaryClientToken[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE]; + + if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) { + return GENERIC_ERROR; + } + + memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen); + shadowRxBuf[params.MessageParams.PayloadLen] = '\0'; // jsmn_parse relies on a string + + if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) { + WARN("Received JSON is not valid"); + return GENERIC_ERROR; + } + + if (isAckForMyThingName(params.pTopicName)) { + uint32_t tempVersionNumber = 0; + if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) { + if (tempVersionNumber > shadowJsonVersionNum) { + shadowJsonVersionNum = tempVersionNumber; + } + } + } + + if (extractClientToken(shadowRxBuf, temporaryClientToken)) { + for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { + if (!AckWaitList[i].isFree) { + if (strcmp(AckWaitList[i].clientTokenID, temporaryClientToken) == 0) { + Shadow_Ack_Status_t status; + if (strstr(params.pTopicName, "accepted") != NULL) { + status = SHADOW_ACK_ACCEPTED; + } else if (strstr(params.pTopicName, "rejected") != NULL) { + status = SHADOW_ACK_REJECTED; + } + if (status == SHADOW_ACK_ACCEPTED || status == SHADOW_ACK_REJECTED) { + if (AckWaitList[i].callback != NULL) { + AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, status, + shadowRxBuf, AckWaitList[i].pCallbackContext); + } + unsubscribeFromAcceptedAndRejected(i); + AckWaitList[i].isFree = true; + return NONE_ERROR; + } + } + } + } + } + + return GENERIC_ERROR; +} + +static int16_t findIndexOfSubscriptionList(const char *pTopic) { + uint8_t i; + for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { + if (!SubscriptionList[i].isFree) { + if ((strcmp(pTopic, SubscriptionList[i].Topic) == 0)) { + return i; + } + } + } + return -1; +} + +static void unsubscribeFromAcceptedAndRejected(uint8_t index) { + + char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + IoT_Error_t ret_val = NONE_ERROR; + + topicNameFromThingAndAction(TemporaryTopicNameAccepted, AckWaitList[index].thingName, AckWaitList[index].action, + SHADOW_ACCEPTED); + topicNameFromThingAndAction(TemporaryTopicNameRejected, AckWaitList[index].thingName, AckWaitList[index].action, + SHADOW_REJECTED); + + int16_t indexSubList; + + indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameAccepted); + if ((indexSubList >= 0)) { + if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) { + ret_val = pMqttClient->unsubscribe(TemporaryTopicNameAccepted); + if (ret_val == NONE_ERROR) { + SubscriptionList[indexSubList].isFree = true; + } + } else if (SubscriptionList[indexSubList].count > 1) { + SubscriptionList[indexSubList].count--; + } + } + + indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameRejected); + if ((indexSubList >= 0)) { + if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) { + ret_val = pMqttClient->unsubscribe(TemporaryTopicNameRejected); + if (ret_val == NONE_ERROR) { + SubscriptionList[indexSubList].isFree = true; + } + } else if (SubscriptionList[indexSubList].count > 1) { + SubscriptionList[indexSubList].count--; + } + } +} + +void initializeRecords(MQTTClient_t *pClient) { + uint8_t i; + for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { + AckWaitList[i].isFree = true; + } + for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { + SubscriptionList[i].isFree = true; + SubscriptionList[i].count = 0; + SubscriptionList[i].isSticky = false; + } + pMqttClient = pClient; +} + +bool isSubscriptionPresent(const char *pThingName, ShadowActions_t action) { + + uint8_t i = 0; + bool isAcceptedPresent = false; + bool isRejectedPresent = false; + char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + + topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED); + topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED); + + for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { + if (!SubscriptionList[i].isFree) { + if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)) { + isAcceptedPresent = true; + } else if ((strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) { + isRejectedPresent = true; + } + } + } + + if (isRejectedPresent && isAcceptedPresent) { + return true; + } + + return false; +} + +IoT_Error_t subscribeToShadowActionAcks(const char *pThingName, ShadowActions_t action, bool isSticky) { + IoT_Error_t ret_val = NONE_ERROR; + MQTTSubscribeParams subParams = MQTTSubscribeParamsDefault; + + bool clearBothEntriesFromList = true; + int16_t indexAcceptedSubList = 0; + int16_t indexRejectedSubList = 0; + indexAcceptedSubList = getNextFreeIndexOfSubscriptionList(); + indexRejectedSubList = getNextFreeIndexOfSubscriptionList(); + + if (indexAcceptedSubList >= 0 && indexRejectedSubList >= 0) { + topicNameFromThingAndAction(SubscriptionList[indexAcceptedSubList].Topic, pThingName, action, SHADOW_ACCEPTED); + subParams.mHandler = AckStatusCallback; + subParams.qos = QOS_0; + subParams.pTopic = SubscriptionList[indexAcceptedSubList].Topic; + ret_val = pMqttClient->subscribe(&subParams); + if (ret_val == NONE_ERROR) { + SubscriptionList[indexAcceptedSubList].count = 1; + SubscriptionList[indexAcceptedSubList].isSticky = isSticky; + topicNameFromThingAndAction(SubscriptionList[indexRejectedSubList].Topic, pThingName, action, + SHADOW_REJECTED); + subParams.pTopic = SubscriptionList[indexRejectedSubList].Topic; + ret_val = pMqttClient->subscribe(&subParams); + if (ret_val == NONE_ERROR) { + SubscriptionList[indexRejectedSubList].count = 1; + SubscriptionList[indexRejectedSubList].isSticky = isSticky; + clearBothEntriesFromList = false; + + // wait for SUBSCRIBE_SETTLING_TIME seconds to let the subscription take effect + Timer subSettlingtimer; + InitTimer(&subSettlingtimer); + countdown(&subSettlingtimer, SUBSCRIBE_SETTLING_TIME); + while(!expired(&subSettlingtimer)); + + } + } + } + + if (clearBothEntriesFromList) { + if (indexAcceptedSubList >= 0) { + SubscriptionList[indexAcceptedSubList].isFree = true; + } else if (indexRejectedSubList >= 0) { + SubscriptionList[indexRejectedSubList].isFree = true; + } + if (SubscriptionList[indexAcceptedSubList].count == 1) { + pMqttClient->unsubscribe(SubscriptionList[indexAcceptedSubList].Topic); + } + } + + return ret_val; +} + +void incrementSubscriptionCnt(const char *pThingName, ShadowActions_t action, bool isSticky) { + char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + uint8_t i; + topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED); + topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED); + + for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { + if (!SubscriptionList[i].isFree) { + if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0) + || (strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) { + SubscriptionList[i].count++; + SubscriptionList[i].isSticky = isSticky; + } + } + } +} + +IoT_Error_t publishToShadowAction(const char * pThingName, ShadowActions_t action, const char *pJsonDocumentToBeSent) { + IoT_Error_t ret_val = NONE_ERROR; + char TemporaryTopicName[MAX_SHADOW_TOPIC_LENGTH_BYTES]; + topicNameFromThingAndAction(TemporaryTopicName, pThingName, action, SHADOW_ACTION); + + MQTTPublishParams pubParams = MQTTPublishParamsDefault; + pubParams.pTopic = TemporaryTopicName; + MQTTMessageParams msgParams = MQTTMessageParamsDefault; + msgParams.qos = QOS_0; + msgParams.PayloadLen = strlen(pJsonDocumentToBeSent) + 1; + msgParams.pPayload = (char *) pJsonDocumentToBeSent; + pubParams.MessageParams = msgParams; + ret_val = pMqttClient->publish(&pubParams); + + return ret_val; +} + +bool getNextFreeIndexOfAckWaitList(uint8_t *pIndex) { + uint8_t i; + if (pIndex != NULL) { + for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { + if (AckWaitList[i].isFree) { + *pIndex = i; + return true; + } + } + } + return false; +} + +void addToAckWaitList(uint8_t indexAckWaitList, const char *pThingName, ShadowActions_t action, + const char *pExtractedClientToken, fpActionCallback_t callback, void *pCallbackContext, + uint32_t timeout_seconds) { + AckWaitList[indexAckWaitList].callback = callback; + strncpy(AckWaitList[indexAckWaitList].clientTokenID, pExtractedClientToken, MAX_SIZE_CLIENT_ID_WITH_SEQUENCE); + strncpy(AckWaitList[indexAckWaitList].thingName, pThingName, MAX_SIZE_OF_THING_NAME); + AckWaitList[indexAckWaitList].pCallbackContext = pCallbackContext; + AckWaitList[indexAckWaitList].action = action; + InitTimer(&(AckWaitList[indexAckWaitList].timer)); + countdown(&(AckWaitList[indexAckWaitList].timer), timeout_seconds); + AckWaitList[indexAckWaitList].isFree = false; +} + +void HandleExpiredResponseCallbacks(void) { + uint8_t i; + for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { + if (!AckWaitList[i].isFree) { + if (expired(&(AckWaitList[i].timer))) { + if (AckWaitList[i].callback != NULL) { + AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, SHADOW_ACK_TIMEOUT, + shadowRxBuf, AckWaitList[i].pCallbackContext); + } + AckWaitList[i].isFree = true; + unsubscribeFromAcceptedAndRejected(i); + } + } + } +} + +static int shadow_delta_callback(MQTTCallbackParams params) { + + int32_t tokenCount; + uint32_t i = 0; + void *pJsonHandler; + int32_t DataPosition; + uint32_t dataLength; + + if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) { + return GENERIC_ERROR; + } + + memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen); + shadowRxBuf[params.MessageParams.PayloadLen] = '\0'; // jsmn_parse relies on a string + + if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) { + WARN("Received JSON is not valid"); + return GENERIC_ERROR; + } + + if (shadowDiscardOldDeltaFlag) { + uint32_t tempVersionNumber = 0; + if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) { + if (tempVersionNumber > shadowJsonVersionNum) { + shadowJsonVersionNum = tempVersionNumber; + DEBUG("New Version number: %d", shadowJsonVersionNum); + } else { + WARN("Old Delta Message received - Ignoring rx: %d local: %d", tempVersionNumber, shadowJsonVersionNum); + return GENERIC_ERROR; + } + } + } + + for (i = 0; i < tokenTableIndex; i++) { + if (!tokenTable[i].isFree) { + if (isJsonKeyMatchingAndUpdateValue(shadowRxBuf, pJsonHandler, tokenCount, (jsonStruct_t *)tokenTable[i].pStruct, + &dataLength, &DataPosition)) { + if (tokenTable[i].callback != NULL) { + tokenTable[i].callback(shadowRxBuf + DataPosition, dataLength, (jsonStruct_t *)tokenTable[i].pStruct); + } + } + } + } + + return NONE_ERROR; +} +