Demo application for using the AT&T IoT Starter Kit Powered by AWS.
Dependencies: SDFileSystem
Fork of ATT_AWS_IoT_demo by
IoT Starter Kit Powered by AWS Demo
This program demonstrates the AT&T IoT Starter Kit sending data directly into AWS IoT. It's explained and used in the Getting Started with the IoT Starter Kit Powered by AWS on starterkit.att.com.
What's required
- AT&T IoT LTE Add-on (also known as the Cellular Shield)
- NXP K64F - for programming
- microSD card - used to store your AWS security credentials
- AWS account
- Python, locally installed
If you don't already have an IoT Starter Kit, you can purchase a kit here. The IoT Starter Kit Powered by AWS includes the LTE cellular shield, K64F, and a microSD card.
AWS_openssl/aws_iot_src/shadow/aws_iot_shadow_records.cpp@27:2f486c766854, 2017-02-07 (annotated)
- Committer:
- rfinn
- Date:
- Tue Feb 07 16:18:57 2017 +0000
- Revision:
- 27:2f486c766854
- Parent:
- 15:6f2798e45099
changed SDFileSystem library
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
ampembeng | 15:6f2798e45099 | 1 | /* |
ampembeng | 15:6f2798e45099 | 2 | * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
ampembeng | 15:6f2798e45099 | 3 | * |
ampembeng | 15:6f2798e45099 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"). |
ampembeng | 15:6f2798e45099 | 5 | * You may not use this file except in compliance with the License. |
ampembeng | 15:6f2798e45099 | 6 | * A copy of the License is located at |
ampembeng | 15:6f2798e45099 | 7 | * |
ampembeng | 15:6f2798e45099 | 8 | * http://aws.amazon.com/apache2.0 |
ampembeng | 15:6f2798e45099 | 9 | * |
ampembeng | 15:6f2798e45099 | 10 | * or in the "license" file accompanying this file. This file is distributed |
ampembeng | 15:6f2798e45099 | 11 | * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
ampembeng | 15:6f2798e45099 | 12 | * express or implied. See the License for the specific language governing |
ampembeng | 15:6f2798e45099 | 13 | * permissions and limitations under the License. |
ampembeng | 15:6f2798e45099 | 14 | */ |
ampembeng | 15:6f2798e45099 | 15 | |
ampembeng | 15:6f2798e45099 | 16 | #include "aws_iot_shadow_records.h" |
ampembeng | 15:6f2798e45099 | 17 | |
ampembeng | 15:6f2798e45099 | 18 | #include <string.h> |
ampembeng | 15:6f2798e45099 | 19 | #include <stdio.h> |
ampembeng | 15:6f2798e45099 | 20 | |
ampembeng | 15:6f2798e45099 | 21 | #include "timer_interface.h" |
ampembeng | 15:6f2798e45099 | 22 | #include "aws_iot_json_utils.h" |
ampembeng | 15:6f2798e45099 | 23 | #include "aws_iot_log.h" |
ampembeng | 15:6f2798e45099 | 24 | #include "aws_iot_shadow_json.h" |
ampembeng | 15:6f2798e45099 | 25 | #include "aws_iot_config.h" |
ampembeng | 15:6f2798e45099 | 26 | |
ampembeng | 15:6f2798e45099 | 27 | typedef struct { |
ampembeng | 15:6f2798e45099 | 28 | char clientTokenID[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE]; |
ampembeng | 15:6f2798e45099 | 29 | char thingName[MAX_SIZE_OF_THING_NAME]; |
ampembeng | 15:6f2798e45099 | 30 | ShadowActions_t action; |
ampembeng | 15:6f2798e45099 | 31 | fpActionCallback_t callback; |
ampembeng | 15:6f2798e45099 | 32 | void *pCallbackContext; |
ampembeng | 15:6f2798e45099 | 33 | bool isFree; |
ampembeng | 15:6f2798e45099 | 34 | Timer timer; |
ampembeng | 15:6f2798e45099 | 35 | } ToBeReceivedAckRecord_t; |
ampembeng | 15:6f2798e45099 | 36 | |
ampembeng | 15:6f2798e45099 | 37 | typedef struct { |
ampembeng | 15:6f2798e45099 | 38 | const char *pKey; |
ampembeng | 15:6f2798e45099 | 39 | void *pStruct; |
ampembeng | 15:6f2798e45099 | 40 | jsonStructCallback_t callback; |
ampembeng | 15:6f2798e45099 | 41 | bool isFree; |
ampembeng | 15:6f2798e45099 | 42 | } JsonTokenTable_t; |
ampembeng | 15:6f2798e45099 | 43 | |
ampembeng | 15:6f2798e45099 | 44 | typedef struct { |
ampembeng | 15:6f2798e45099 | 45 | char Topic[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 46 | uint8_t count; |
ampembeng | 15:6f2798e45099 | 47 | bool isFree; |
ampembeng | 15:6f2798e45099 | 48 | bool isSticky; |
ampembeng | 15:6f2798e45099 | 49 | } SubscriptionRecord_t; |
ampembeng | 15:6f2798e45099 | 50 | |
ampembeng | 15:6f2798e45099 | 51 | typedef enum { |
ampembeng | 15:6f2798e45099 | 52 | SHADOW_ACCEPTED, SHADOW_REJECTED, SHADOW_ACTION |
ampembeng | 15:6f2798e45099 | 53 | } ShadowAckTopicTypes_t; |
ampembeng | 15:6f2798e45099 | 54 | |
ampembeng | 15:6f2798e45099 | 55 | ToBeReceivedAckRecord_t AckWaitList[MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME]; |
ampembeng | 15:6f2798e45099 | 56 | |
ampembeng | 15:6f2798e45099 | 57 | MQTTClient_t *pMqttClient; |
ampembeng | 15:6f2798e45099 | 58 | |
ampembeng | 15:6f2798e45099 | 59 | char myThingName[MAX_SIZE_OF_THING_NAME]; |
ampembeng | 15:6f2798e45099 | 60 | char mqttClientID[MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES]; |
ampembeng | 15:6f2798e45099 | 61 | |
ampembeng | 15:6f2798e45099 | 62 | char shadowDeltaTopic[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 63 | |
ampembeng | 15:6f2798e45099 | 64 | #define MAX_TOPICS_AT_ANY_GIVEN_TIME 2*MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME |
ampembeng | 15:6f2798e45099 | 65 | SubscriptionRecord_t SubscriptionList[MAX_TOPICS_AT_ANY_GIVEN_TIME]; |
ampembeng | 15:6f2798e45099 | 66 | |
ampembeng | 15:6f2798e45099 | 67 | #define SUBSCRIBE_SETTLING_TIME 2 |
ampembeng | 15:6f2798e45099 | 68 | char shadowRxBuf[SHADOW_MAX_SIZE_OF_RX_BUFFER]; |
ampembeng | 15:6f2798e45099 | 69 | |
ampembeng | 15:6f2798e45099 | 70 | static JsonTokenTable_t tokenTable[MAX_JSON_TOKEN_EXPECTED]; |
ampembeng | 15:6f2798e45099 | 71 | static uint32_t tokenTableIndex = 0; |
ampembeng | 15:6f2798e45099 | 72 | static bool deltaTopicSubscribedFlag = false; |
ampembeng | 15:6f2798e45099 | 73 | uint32_t shadowJsonVersionNum = 0; |
ampembeng | 15:6f2798e45099 | 74 | bool shadowDiscardOldDeltaFlag = true; |
ampembeng | 15:6f2798e45099 | 75 | |
ampembeng | 15:6f2798e45099 | 76 | // local helper functions |
ampembeng | 15:6f2798e45099 | 77 | static int AckStatusCallback(MQTTCallbackParams params); |
ampembeng | 15:6f2798e45099 | 78 | static int shadow_delta_callback(MQTTCallbackParams params); |
ampembeng | 15:6f2798e45099 | 79 | static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action, |
ampembeng | 15:6f2798e45099 | 80 | ShadowAckTopicTypes_t ackType); |
ampembeng | 15:6f2798e45099 | 81 | static int16_t getNextFreeIndexOfSubscriptionList(void); |
ampembeng | 15:6f2798e45099 | 82 | static void unsubscribeFromAcceptedAndRejected(uint8_t index); |
ampembeng | 15:6f2798e45099 | 83 | |
ampembeng | 15:6f2798e45099 | 84 | void initDeltaTokens(void) { |
ampembeng | 15:6f2798e45099 | 85 | uint32_t i; |
ampembeng | 15:6f2798e45099 | 86 | for (i = 0; i < MAX_JSON_TOKEN_EXPECTED; i++) { |
ampembeng | 15:6f2798e45099 | 87 | tokenTable[i].isFree = true; |
ampembeng | 15:6f2798e45099 | 88 | } |
ampembeng | 15:6f2798e45099 | 89 | tokenTableIndex = 0; |
ampembeng | 15:6f2798e45099 | 90 | deltaTopicSubscribedFlag = false; |
ampembeng | 15:6f2798e45099 | 91 | } |
ampembeng | 15:6f2798e45099 | 92 | |
ampembeng | 15:6f2798e45099 | 93 | IoT_Error_t registerJsonTokenOnDelta(jsonStruct_t *pStruct) { |
ampembeng | 15:6f2798e45099 | 94 | |
ampembeng | 15:6f2798e45099 | 95 | IoT_Error_t rc = NONE_ERROR; |
ampembeng | 15:6f2798e45099 | 96 | |
ampembeng | 15:6f2798e45099 | 97 | if (!deltaTopicSubscribedFlag) { |
ampembeng | 15:6f2798e45099 | 98 | MQTTSubscribeParams subParams; |
ampembeng | 15:6f2798e45099 | 99 | subParams.mHandler = shadow_delta_callback; |
ampembeng | 15:6f2798e45099 | 100 | snprintf(shadowDeltaTopic,MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/update/delta", myThingName); |
ampembeng | 15:6f2798e45099 | 101 | subParams.pTopic = shadowDeltaTopic; |
ampembeng | 15:6f2798e45099 | 102 | subParams.qos = QOS_0; |
ampembeng | 15:6f2798e45099 | 103 | rc = pMqttClient->subscribe(&subParams); |
ampembeng | 15:6f2798e45099 | 104 | DEBUG("delta topic %s", shadowDeltaTopic); |
ampembeng | 15:6f2798e45099 | 105 | deltaTopicSubscribedFlag = true; |
ampembeng | 15:6f2798e45099 | 106 | } |
ampembeng | 15:6f2798e45099 | 107 | |
ampembeng | 15:6f2798e45099 | 108 | if (tokenTableIndex >= MAX_JSON_TOKEN_EXPECTED) { |
ampembeng | 15:6f2798e45099 | 109 | return GENERIC_ERROR; |
ampembeng | 15:6f2798e45099 | 110 | } |
ampembeng | 15:6f2798e45099 | 111 | |
ampembeng | 15:6f2798e45099 | 112 | tokenTable[tokenTableIndex].pKey = pStruct->pKey; |
ampembeng | 15:6f2798e45099 | 113 | tokenTable[tokenTableIndex].callback = pStruct->cb; |
ampembeng | 15:6f2798e45099 | 114 | tokenTable[tokenTableIndex].pStruct = pStruct; |
ampembeng | 15:6f2798e45099 | 115 | tokenTable[tokenTableIndex].isFree = false; |
ampembeng | 15:6f2798e45099 | 116 | tokenTableIndex++; |
ampembeng | 15:6f2798e45099 | 117 | |
ampembeng | 15:6f2798e45099 | 118 | return rc; |
ampembeng | 15:6f2798e45099 | 119 | } |
ampembeng | 15:6f2798e45099 | 120 | |
ampembeng | 15:6f2798e45099 | 121 | static int16_t getNextFreeIndexOfSubscriptionList(void) { |
ampembeng | 15:6f2798e45099 | 122 | uint8_t i; |
ampembeng | 15:6f2798e45099 | 123 | for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 124 | if (SubscriptionList[i].isFree) { |
ampembeng | 15:6f2798e45099 | 125 | SubscriptionList[i].isFree = false; |
ampembeng | 15:6f2798e45099 | 126 | return i; |
ampembeng | 15:6f2798e45099 | 127 | } |
ampembeng | 15:6f2798e45099 | 128 | } |
ampembeng | 15:6f2798e45099 | 129 | return -1; |
ampembeng | 15:6f2798e45099 | 130 | } |
ampembeng | 15:6f2798e45099 | 131 | |
ampembeng | 15:6f2798e45099 | 132 | static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action, |
ampembeng | 15:6f2798e45099 | 133 | ShadowAckTopicTypes_t ackType) { |
ampembeng | 15:6f2798e45099 | 134 | |
ampembeng | 15:6f2798e45099 | 135 | char actionBuf[10]; |
ampembeng | 15:6f2798e45099 | 136 | char ackTypeBuf[10]; |
ampembeng | 15:6f2798e45099 | 137 | |
ampembeng | 15:6f2798e45099 | 138 | if (action == SHADOW_GET) { |
ampembeng | 15:6f2798e45099 | 139 | strcpy(actionBuf, "get"); |
ampembeng | 15:6f2798e45099 | 140 | } else if (action == SHADOW_UPDATE) { |
ampembeng | 15:6f2798e45099 | 141 | strcpy(actionBuf, "update"); |
ampembeng | 15:6f2798e45099 | 142 | } else if (action == SHADOW_DELETE) { |
ampembeng | 15:6f2798e45099 | 143 | strcpy(actionBuf, "delete"); |
ampembeng | 15:6f2798e45099 | 144 | } |
ampembeng | 15:6f2798e45099 | 145 | |
ampembeng | 15:6f2798e45099 | 146 | if (ackType == SHADOW_ACCEPTED) { |
ampembeng | 15:6f2798e45099 | 147 | strcpy(ackTypeBuf, "accepted"); |
ampembeng | 15:6f2798e45099 | 148 | } else if (ackType == SHADOW_REJECTED) { |
ampembeng | 15:6f2798e45099 | 149 | strcpy(ackTypeBuf, "rejected"); |
ampembeng | 15:6f2798e45099 | 150 | } |
ampembeng | 15:6f2798e45099 | 151 | |
ampembeng | 15:6f2798e45099 | 152 | if (ackType == SHADOW_ACTION) { |
ampembeng | 15:6f2798e45099 | 153 | sprintf(pTopic, "$aws/things/%s/shadow/%s", pThingName, actionBuf); |
ampembeng | 15:6f2798e45099 | 154 | } else { |
ampembeng | 15:6f2798e45099 | 155 | sprintf(pTopic, "$aws/things/%s/shadow/%s/%s", pThingName, actionBuf, ackTypeBuf); |
ampembeng | 15:6f2798e45099 | 156 | } |
ampembeng | 15:6f2798e45099 | 157 | } |
ampembeng | 15:6f2798e45099 | 158 | |
ampembeng | 15:6f2798e45099 | 159 | static bool isAckForMyThingName(const char *pTopicName) { |
ampembeng | 15:6f2798e45099 | 160 | if (strstr(pTopicName, myThingName) != NULL && ((strstr(pTopicName, "get/accepted") != NULL) || (strstr(pTopicName, "delta") != NULL))) { |
ampembeng | 15:6f2798e45099 | 161 | return true; |
ampembeng | 15:6f2798e45099 | 162 | } |
ampembeng | 15:6f2798e45099 | 163 | return false; |
ampembeng | 15:6f2798e45099 | 164 | } |
ampembeng | 15:6f2798e45099 | 165 | |
ampembeng | 15:6f2798e45099 | 166 | static int AckStatusCallback(MQTTCallbackParams params) { |
ampembeng | 15:6f2798e45099 | 167 | int32_t tokenCount; |
ampembeng | 15:6f2798e45099 | 168 | int32_t i; |
ampembeng | 15:6f2798e45099 | 169 | void *pJsonHandler; |
ampembeng | 15:6f2798e45099 | 170 | char temporaryClientToken[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE]; |
ampembeng | 15:6f2798e45099 | 171 | |
ampembeng | 15:6f2798e45099 | 172 | if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) { |
ampembeng | 15:6f2798e45099 | 173 | return GENERIC_ERROR; |
ampembeng | 15:6f2798e45099 | 174 | } |
ampembeng | 15:6f2798e45099 | 175 | |
ampembeng | 15:6f2798e45099 | 176 | memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen); |
ampembeng | 15:6f2798e45099 | 177 | shadowRxBuf[params.MessageParams.PayloadLen] = '\0'; // jsmn_parse relies on a string |
ampembeng | 15:6f2798e45099 | 178 | |
ampembeng | 15:6f2798e45099 | 179 | if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) { |
ampembeng | 15:6f2798e45099 | 180 | WARN("Received JSON is not valid"); |
ampembeng | 15:6f2798e45099 | 181 | return GENERIC_ERROR; |
ampembeng | 15:6f2798e45099 | 182 | } |
ampembeng | 15:6f2798e45099 | 183 | |
ampembeng | 15:6f2798e45099 | 184 | if (isAckForMyThingName(params.pTopicName)) { |
ampembeng | 15:6f2798e45099 | 185 | uint32_t tempVersionNumber = 0; |
ampembeng | 15:6f2798e45099 | 186 | if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) { |
ampembeng | 15:6f2798e45099 | 187 | if (tempVersionNumber > shadowJsonVersionNum) { |
ampembeng | 15:6f2798e45099 | 188 | shadowJsonVersionNum = tempVersionNumber; |
ampembeng | 15:6f2798e45099 | 189 | } |
ampembeng | 15:6f2798e45099 | 190 | } |
ampembeng | 15:6f2798e45099 | 191 | } |
ampembeng | 15:6f2798e45099 | 192 | |
ampembeng | 15:6f2798e45099 | 193 | if (extractClientToken(shadowRxBuf, temporaryClientToken)) { |
ampembeng | 15:6f2798e45099 | 194 | for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 195 | if (!AckWaitList[i].isFree) { |
ampembeng | 15:6f2798e45099 | 196 | if (strcmp(AckWaitList[i].clientTokenID, temporaryClientToken) == 0) { |
ampembeng | 15:6f2798e45099 | 197 | Shadow_Ack_Status_t status; |
ampembeng | 15:6f2798e45099 | 198 | if (strstr(params.pTopicName, "accepted") != NULL) { |
ampembeng | 15:6f2798e45099 | 199 | status = SHADOW_ACK_ACCEPTED; |
ampembeng | 15:6f2798e45099 | 200 | } else if (strstr(params.pTopicName, "rejected") != NULL) { |
ampembeng | 15:6f2798e45099 | 201 | status = SHADOW_ACK_REJECTED; |
ampembeng | 15:6f2798e45099 | 202 | } |
ampembeng | 15:6f2798e45099 | 203 | if (status == SHADOW_ACK_ACCEPTED || status == SHADOW_ACK_REJECTED) { |
ampembeng | 15:6f2798e45099 | 204 | if (AckWaitList[i].callback != NULL) { |
ampembeng | 15:6f2798e45099 | 205 | AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, status, |
ampembeng | 15:6f2798e45099 | 206 | shadowRxBuf, AckWaitList[i].pCallbackContext); |
ampembeng | 15:6f2798e45099 | 207 | } |
ampembeng | 15:6f2798e45099 | 208 | unsubscribeFromAcceptedAndRejected(i); |
ampembeng | 15:6f2798e45099 | 209 | AckWaitList[i].isFree = true; |
ampembeng | 15:6f2798e45099 | 210 | return NONE_ERROR; |
ampembeng | 15:6f2798e45099 | 211 | } |
ampembeng | 15:6f2798e45099 | 212 | } |
ampembeng | 15:6f2798e45099 | 213 | } |
ampembeng | 15:6f2798e45099 | 214 | } |
ampembeng | 15:6f2798e45099 | 215 | } |
ampembeng | 15:6f2798e45099 | 216 | |
ampembeng | 15:6f2798e45099 | 217 | return GENERIC_ERROR; |
ampembeng | 15:6f2798e45099 | 218 | } |
ampembeng | 15:6f2798e45099 | 219 | |
ampembeng | 15:6f2798e45099 | 220 | static int16_t findIndexOfSubscriptionList(const char *pTopic) { |
ampembeng | 15:6f2798e45099 | 221 | uint8_t i; |
ampembeng | 15:6f2798e45099 | 222 | for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 223 | if (!SubscriptionList[i].isFree) { |
ampembeng | 15:6f2798e45099 | 224 | if ((strcmp(pTopic, SubscriptionList[i].Topic) == 0)) { |
ampembeng | 15:6f2798e45099 | 225 | return i; |
ampembeng | 15:6f2798e45099 | 226 | } |
ampembeng | 15:6f2798e45099 | 227 | } |
ampembeng | 15:6f2798e45099 | 228 | } |
ampembeng | 15:6f2798e45099 | 229 | return -1; |
ampembeng | 15:6f2798e45099 | 230 | } |
ampembeng | 15:6f2798e45099 | 231 | |
ampembeng | 15:6f2798e45099 | 232 | static void unsubscribeFromAcceptedAndRejected(uint8_t index) { |
ampembeng | 15:6f2798e45099 | 233 | |
ampembeng | 15:6f2798e45099 | 234 | char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 235 | char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 236 | IoT_Error_t ret_val = NONE_ERROR; |
ampembeng | 15:6f2798e45099 | 237 | |
ampembeng | 15:6f2798e45099 | 238 | topicNameFromThingAndAction(TemporaryTopicNameAccepted, AckWaitList[index].thingName, AckWaitList[index].action, |
ampembeng | 15:6f2798e45099 | 239 | SHADOW_ACCEPTED); |
ampembeng | 15:6f2798e45099 | 240 | topicNameFromThingAndAction(TemporaryTopicNameRejected, AckWaitList[index].thingName, AckWaitList[index].action, |
ampembeng | 15:6f2798e45099 | 241 | SHADOW_REJECTED); |
ampembeng | 15:6f2798e45099 | 242 | |
ampembeng | 15:6f2798e45099 | 243 | int16_t indexSubList; |
ampembeng | 15:6f2798e45099 | 244 | |
ampembeng | 15:6f2798e45099 | 245 | indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameAccepted); |
ampembeng | 15:6f2798e45099 | 246 | if ((indexSubList >= 0)) { |
ampembeng | 15:6f2798e45099 | 247 | if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) { |
ampembeng | 15:6f2798e45099 | 248 | ret_val = pMqttClient->unsubscribe(TemporaryTopicNameAccepted); |
ampembeng | 15:6f2798e45099 | 249 | if (ret_val == NONE_ERROR) { |
ampembeng | 15:6f2798e45099 | 250 | SubscriptionList[indexSubList].isFree = true; |
ampembeng | 15:6f2798e45099 | 251 | } |
ampembeng | 15:6f2798e45099 | 252 | } else if (SubscriptionList[indexSubList].count > 1) { |
ampembeng | 15:6f2798e45099 | 253 | SubscriptionList[indexSubList].count--; |
ampembeng | 15:6f2798e45099 | 254 | } |
ampembeng | 15:6f2798e45099 | 255 | } |
ampembeng | 15:6f2798e45099 | 256 | |
ampembeng | 15:6f2798e45099 | 257 | indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameRejected); |
ampembeng | 15:6f2798e45099 | 258 | if ((indexSubList >= 0)) { |
ampembeng | 15:6f2798e45099 | 259 | if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) { |
ampembeng | 15:6f2798e45099 | 260 | ret_val = pMqttClient->unsubscribe(TemporaryTopicNameRejected); |
ampembeng | 15:6f2798e45099 | 261 | if (ret_val == NONE_ERROR) { |
ampembeng | 15:6f2798e45099 | 262 | SubscriptionList[indexSubList].isFree = true; |
ampembeng | 15:6f2798e45099 | 263 | } |
ampembeng | 15:6f2798e45099 | 264 | } else if (SubscriptionList[indexSubList].count > 1) { |
ampembeng | 15:6f2798e45099 | 265 | SubscriptionList[indexSubList].count--; |
ampembeng | 15:6f2798e45099 | 266 | } |
ampembeng | 15:6f2798e45099 | 267 | } |
ampembeng | 15:6f2798e45099 | 268 | } |
ampembeng | 15:6f2798e45099 | 269 | |
ampembeng | 15:6f2798e45099 | 270 | void initializeRecords(MQTTClient_t *pClient) { |
ampembeng | 15:6f2798e45099 | 271 | uint8_t i; |
ampembeng | 15:6f2798e45099 | 272 | for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 273 | AckWaitList[i].isFree = true; |
ampembeng | 15:6f2798e45099 | 274 | } |
ampembeng | 15:6f2798e45099 | 275 | for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 276 | SubscriptionList[i].isFree = true; |
ampembeng | 15:6f2798e45099 | 277 | SubscriptionList[i].count = 0; |
ampembeng | 15:6f2798e45099 | 278 | SubscriptionList[i].isSticky = false; |
ampembeng | 15:6f2798e45099 | 279 | } |
ampembeng | 15:6f2798e45099 | 280 | pMqttClient = pClient; |
ampembeng | 15:6f2798e45099 | 281 | } |
ampembeng | 15:6f2798e45099 | 282 | |
ampembeng | 15:6f2798e45099 | 283 | bool isSubscriptionPresent(const char *pThingName, ShadowActions_t action) { |
ampembeng | 15:6f2798e45099 | 284 | |
ampembeng | 15:6f2798e45099 | 285 | uint8_t i = 0; |
ampembeng | 15:6f2798e45099 | 286 | bool isAcceptedPresent = false; |
ampembeng | 15:6f2798e45099 | 287 | bool isRejectedPresent = false; |
ampembeng | 15:6f2798e45099 | 288 | char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 289 | char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 290 | |
ampembeng | 15:6f2798e45099 | 291 | topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED); |
ampembeng | 15:6f2798e45099 | 292 | topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED); |
ampembeng | 15:6f2798e45099 | 293 | |
ampembeng | 15:6f2798e45099 | 294 | for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 295 | if (!SubscriptionList[i].isFree) { |
ampembeng | 15:6f2798e45099 | 296 | if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)) { |
ampembeng | 15:6f2798e45099 | 297 | isAcceptedPresent = true; |
ampembeng | 15:6f2798e45099 | 298 | } else if ((strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) { |
ampembeng | 15:6f2798e45099 | 299 | isRejectedPresent = true; |
ampembeng | 15:6f2798e45099 | 300 | } |
ampembeng | 15:6f2798e45099 | 301 | } |
ampembeng | 15:6f2798e45099 | 302 | } |
ampembeng | 15:6f2798e45099 | 303 | |
ampembeng | 15:6f2798e45099 | 304 | if (isRejectedPresent && isAcceptedPresent) { |
ampembeng | 15:6f2798e45099 | 305 | return true; |
ampembeng | 15:6f2798e45099 | 306 | } |
ampembeng | 15:6f2798e45099 | 307 | |
ampembeng | 15:6f2798e45099 | 308 | return false; |
ampembeng | 15:6f2798e45099 | 309 | } |
ampembeng | 15:6f2798e45099 | 310 | |
ampembeng | 15:6f2798e45099 | 311 | IoT_Error_t subscribeToShadowActionAcks(const char *pThingName, ShadowActions_t action, bool isSticky) { |
ampembeng | 15:6f2798e45099 | 312 | IoT_Error_t ret_val = NONE_ERROR; |
ampembeng | 15:6f2798e45099 | 313 | MQTTSubscribeParams subParams = MQTTSubscribeParamsDefault; |
ampembeng | 15:6f2798e45099 | 314 | |
ampembeng | 15:6f2798e45099 | 315 | bool clearBothEntriesFromList = true; |
ampembeng | 15:6f2798e45099 | 316 | int16_t indexAcceptedSubList = 0; |
ampembeng | 15:6f2798e45099 | 317 | int16_t indexRejectedSubList = 0; |
ampembeng | 15:6f2798e45099 | 318 | indexAcceptedSubList = getNextFreeIndexOfSubscriptionList(); |
ampembeng | 15:6f2798e45099 | 319 | indexRejectedSubList = getNextFreeIndexOfSubscriptionList(); |
ampembeng | 15:6f2798e45099 | 320 | |
ampembeng | 15:6f2798e45099 | 321 | if (indexAcceptedSubList >= 0 && indexRejectedSubList >= 0) { |
ampembeng | 15:6f2798e45099 | 322 | topicNameFromThingAndAction(SubscriptionList[indexAcceptedSubList].Topic, pThingName, action, SHADOW_ACCEPTED); |
ampembeng | 15:6f2798e45099 | 323 | subParams.mHandler = AckStatusCallback; |
ampembeng | 15:6f2798e45099 | 324 | subParams.qos = QOS_0; |
ampembeng | 15:6f2798e45099 | 325 | subParams.pTopic = SubscriptionList[indexAcceptedSubList].Topic; |
ampembeng | 15:6f2798e45099 | 326 | ret_val = pMqttClient->subscribe(&subParams); |
ampembeng | 15:6f2798e45099 | 327 | if (ret_val == NONE_ERROR) { |
ampembeng | 15:6f2798e45099 | 328 | SubscriptionList[indexAcceptedSubList].count = 1; |
ampembeng | 15:6f2798e45099 | 329 | SubscriptionList[indexAcceptedSubList].isSticky = isSticky; |
ampembeng | 15:6f2798e45099 | 330 | topicNameFromThingAndAction(SubscriptionList[indexRejectedSubList].Topic, pThingName, action, |
ampembeng | 15:6f2798e45099 | 331 | SHADOW_REJECTED); |
ampembeng | 15:6f2798e45099 | 332 | subParams.pTopic = SubscriptionList[indexRejectedSubList].Topic; |
ampembeng | 15:6f2798e45099 | 333 | ret_val = pMqttClient->subscribe(&subParams); |
ampembeng | 15:6f2798e45099 | 334 | if (ret_val == NONE_ERROR) { |
ampembeng | 15:6f2798e45099 | 335 | SubscriptionList[indexRejectedSubList].count = 1; |
ampembeng | 15:6f2798e45099 | 336 | SubscriptionList[indexRejectedSubList].isSticky = isSticky; |
ampembeng | 15:6f2798e45099 | 337 | clearBothEntriesFromList = false; |
ampembeng | 15:6f2798e45099 | 338 | |
ampembeng | 15:6f2798e45099 | 339 | // wait for SUBSCRIBE_SETTLING_TIME seconds to let the subscription take effect |
ampembeng | 15:6f2798e45099 | 340 | Timer subSettlingtimer; |
ampembeng | 15:6f2798e45099 | 341 | InitTimer(&subSettlingtimer); |
ampembeng | 15:6f2798e45099 | 342 | countdown(&subSettlingtimer, SUBSCRIBE_SETTLING_TIME); |
ampembeng | 15:6f2798e45099 | 343 | while(!expired(&subSettlingtimer)); |
ampembeng | 15:6f2798e45099 | 344 | |
ampembeng | 15:6f2798e45099 | 345 | } |
ampembeng | 15:6f2798e45099 | 346 | } |
ampembeng | 15:6f2798e45099 | 347 | } |
ampembeng | 15:6f2798e45099 | 348 | |
ampembeng | 15:6f2798e45099 | 349 | if (clearBothEntriesFromList) { |
ampembeng | 15:6f2798e45099 | 350 | if (indexAcceptedSubList >= 0) { |
ampembeng | 15:6f2798e45099 | 351 | SubscriptionList[indexAcceptedSubList].isFree = true; |
ampembeng | 15:6f2798e45099 | 352 | } else if (indexRejectedSubList >= 0) { |
ampembeng | 15:6f2798e45099 | 353 | SubscriptionList[indexRejectedSubList].isFree = true; |
ampembeng | 15:6f2798e45099 | 354 | } |
ampembeng | 15:6f2798e45099 | 355 | if (SubscriptionList[indexAcceptedSubList].count == 1) { |
ampembeng | 15:6f2798e45099 | 356 | pMqttClient->unsubscribe(SubscriptionList[indexAcceptedSubList].Topic); |
ampembeng | 15:6f2798e45099 | 357 | } |
ampembeng | 15:6f2798e45099 | 358 | } |
ampembeng | 15:6f2798e45099 | 359 | |
ampembeng | 15:6f2798e45099 | 360 | return ret_val; |
ampembeng | 15:6f2798e45099 | 361 | } |
ampembeng | 15:6f2798e45099 | 362 | |
ampembeng | 15:6f2798e45099 | 363 | void incrementSubscriptionCnt(const char *pThingName, ShadowActions_t action, bool isSticky) { |
ampembeng | 15:6f2798e45099 | 364 | char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 365 | char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 366 | uint8_t i; |
ampembeng | 15:6f2798e45099 | 367 | topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED); |
ampembeng | 15:6f2798e45099 | 368 | topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED); |
ampembeng | 15:6f2798e45099 | 369 | |
ampembeng | 15:6f2798e45099 | 370 | for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 371 | if (!SubscriptionList[i].isFree) { |
ampembeng | 15:6f2798e45099 | 372 | if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0) |
ampembeng | 15:6f2798e45099 | 373 | || (strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) { |
ampembeng | 15:6f2798e45099 | 374 | SubscriptionList[i].count++; |
ampembeng | 15:6f2798e45099 | 375 | SubscriptionList[i].isSticky = isSticky; |
ampembeng | 15:6f2798e45099 | 376 | } |
ampembeng | 15:6f2798e45099 | 377 | } |
ampembeng | 15:6f2798e45099 | 378 | } |
ampembeng | 15:6f2798e45099 | 379 | } |
ampembeng | 15:6f2798e45099 | 380 | |
ampembeng | 15:6f2798e45099 | 381 | IoT_Error_t publishToShadowAction(const char * pThingName, ShadowActions_t action, const char *pJsonDocumentToBeSent) { |
ampembeng | 15:6f2798e45099 | 382 | IoT_Error_t ret_val = NONE_ERROR; |
ampembeng | 15:6f2798e45099 | 383 | char TemporaryTopicName[MAX_SHADOW_TOPIC_LENGTH_BYTES]; |
ampembeng | 15:6f2798e45099 | 384 | topicNameFromThingAndAction(TemporaryTopicName, pThingName, action, SHADOW_ACTION); |
ampembeng | 15:6f2798e45099 | 385 | |
ampembeng | 15:6f2798e45099 | 386 | MQTTPublishParams pubParams = MQTTPublishParamsDefault; |
ampembeng | 15:6f2798e45099 | 387 | pubParams.pTopic = TemporaryTopicName; |
ampembeng | 15:6f2798e45099 | 388 | MQTTMessageParams msgParams = MQTTMessageParamsDefault; |
ampembeng | 15:6f2798e45099 | 389 | msgParams.qos = QOS_0; |
ampembeng | 15:6f2798e45099 | 390 | msgParams.PayloadLen = strlen(pJsonDocumentToBeSent) + 1; |
ampembeng | 15:6f2798e45099 | 391 | msgParams.pPayload = (char *) pJsonDocumentToBeSent; |
ampembeng | 15:6f2798e45099 | 392 | pubParams.MessageParams = msgParams; |
ampembeng | 15:6f2798e45099 | 393 | ret_val = pMqttClient->publish(&pubParams); |
ampembeng | 15:6f2798e45099 | 394 | |
ampembeng | 15:6f2798e45099 | 395 | return ret_val; |
ampembeng | 15:6f2798e45099 | 396 | } |
ampembeng | 15:6f2798e45099 | 397 | |
ampembeng | 15:6f2798e45099 | 398 | bool getNextFreeIndexOfAckWaitList(uint8_t *pIndex) { |
ampembeng | 15:6f2798e45099 | 399 | uint8_t i; |
ampembeng | 15:6f2798e45099 | 400 | if (pIndex != NULL) { |
ampembeng | 15:6f2798e45099 | 401 | for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 402 | if (AckWaitList[i].isFree) { |
ampembeng | 15:6f2798e45099 | 403 | *pIndex = i; |
ampembeng | 15:6f2798e45099 | 404 | return true; |
ampembeng | 15:6f2798e45099 | 405 | } |
ampembeng | 15:6f2798e45099 | 406 | } |
ampembeng | 15:6f2798e45099 | 407 | } |
ampembeng | 15:6f2798e45099 | 408 | return false; |
ampembeng | 15:6f2798e45099 | 409 | } |
ampembeng | 15:6f2798e45099 | 410 | |
ampembeng | 15:6f2798e45099 | 411 | void addToAckWaitList(uint8_t indexAckWaitList, const char *pThingName, ShadowActions_t action, |
ampembeng | 15:6f2798e45099 | 412 | const char *pExtractedClientToken, fpActionCallback_t callback, void *pCallbackContext, |
ampembeng | 15:6f2798e45099 | 413 | uint32_t timeout_seconds) { |
ampembeng | 15:6f2798e45099 | 414 | AckWaitList[indexAckWaitList].callback = callback; |
ampembeng | 15:6f2798e45099 | 415 | strncpy(AckWaitList[indexAckWaitList].clientTokenID, pExtractedClientToken, MAX_SIZE_CLIENT_ID_WITH_SEQUENCE); |
ampembeng | 15:6f2798e45099 | 416 | strncpy(AckWaitList[indexAckWaitList].thingName, pThingName, MAX_SIZE_OF_THING_NAME); |
ampembeng | 15:6f2798e45099 | 417 | AckWaitList[indexAckWaitList].pCallbackContext = pCallbackContext; |
ampembeng | 15:6f2798e45099 | 418 | AckWaitList[indexAckWaitList].action = action; |
ampembeng | 15:6f2798e45099 | 419 | InitTimer(&(AckWaitList[indexAckWaitList].timer)); |
ampembeng | 15:6f2798e45099 | 420 | countdown(&(AckWaitList[indexAckWaitList].timer), timeout_seconds); |
ampembeng | 15:6f2798e45099 | 421 | AckWaitList[indexAckWaitList].isFree = false; |
ampembeng | 15:6f2798e45099 | 422 | } |
ampembeng | 15:6f2798e45099 | 423 | |
ampembeng | 15:6f2798e45099 | 424 | void HandleExpiredResponseCallbacks(void) { |
ampembeng | 15:6f2798e45099 | 425 | uint8_t i; |
ampembeng | 15:6f2798e45099 | 426 | for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { |
ampembeng | 15:6f2798e45099 | 427 | if (!AckWaitList[i].isFree) { |
ampembeng | 15:6f2798e45099 | 428 | if (expired(&(AckWaitList[i].timer))) { |
ampembeng | 15:6f2798e45099 | 429 | if (AckWaitList[i].callback != NULL) { |
ampembeng | 15:6f2798e45099 | 430 | AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, SHADOW_ACK_TIMEOUT, |
ampembeng | 15:6f2798e45099 | 431 | shadowRxBuf, AckWaitList[i].pCallbackContext); |
ampembeng | 15:6f2798e45099 | 432 | } |
ampembeng | 15:6f2798e45099 | 433 | AckWaitList[i].isFree = true; |
ampembeng | 15:6f2798e45099 | 434 | unsubscribeFromAcceptedAndRejected(i); |
ampembeng | 15:6f2798e45099 | 435 | } |
ampembeng | 15:6f2798e45099 | 436 | } |
ampembeng | 15:6f2798e45099 | 437 | } |
ampembeng | 15:6f2798e45099 | 438 | } |
ampembeng | 15:6f2798e45099 | 439 | |
ampembeng | 15:6f2798e45099 | 440 | static int shadow_delta_callback(MQTTCallbackParams params) { |
ampembeng | 15:6f2798e45099 | 441 | |
ampembeng | 15:6f2798e45099 | 442 | int32_t tokenCount; |
ampembeng | 15:6f2798e45099 | 443 | uint32_t i = 0; |
ampembeng | 15:6f2798e45099 | 444 | void *pJsonHandler; |
ampembeng | 15:6f2798e45099 | 445 | int32_t DataPosition; |
ampembeng | 15:6f2798e45099 | 446 | uint32_t dataLength; |
ampembeng | 15:6f2798e45099 | 447 | |
ampembeng | 15:6f2798e45099 | 448 | if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) { |
ampembeng | 15:6f2798e45099 | 449 | return GENERIC_ERROR; |
ampembeng | 15:6f2798e45099 | 450 | } |
ampembeng | 15:6f2798e45099 | 451 | |
ampembeng | 15:6f2798e45099 | 452 | memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen); |
ampembeng | 15:6f2798e45099 | 453 | shadowRxBuf[params.MessageParams.PayloadLen] = '\0'; // jsmn_parse relies on a string |
ampembeng | 15:6f2798e45099 | 454 | |
ampembeng | 15:6f2798e45099 | 455 | if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) { |
ampembeng | 15:6f2798e45099 | 456 | WARN("Received JSON is not valid"); |
ampembeng | 15:6f2798e45099 | 457 | return GENERIC_ERROR; |
ampembeng | 15:6f2798e45099 | 458 | } |
ampembeng | 15:6f2798e45099 | 459 | |
ampembeng | 15:6f2798e45099 | 460 | if (shadowDiscardOldDeltaFlag) { |
ampembeng | 15:6f2798e45099 | 461 | uint32_t tempVersionNumber = 0; |
ampembeng | 15:6f2798e45099 | 462 | if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) { |
ampembeng | 15:6f2798e45099 | 463 | if (tempVersionNumber > shadowJsonVersionNum) { |
ampembeng | 15:6f2798e45099 | 464 | shadowJsonVersionNum = tempVersionNumber; |
ampembeng | 15:6f2798e45099 | 465 | DEBUG("New Version number: %d", shadowJsonVersionNum); |
ampembeng | 15:6f2798e45099 | 466 | } else { |
ampembeng | 15:6f2798e45099 | 467 | WARN("Old Delta Message received - Ignoring rx: %d local: %d", tempVersionNumber, shadowJsonVersionNum); |
ampembeng | 15:6f2798e45099 | 468 | return GENERIC_ERROR; |
ampembeng | 15:6f2798e45099 | 469 | } |
ampembeng | 15:6f2798e45099 | 470 | } |
ampembeng | 15:6f2798e45099 | 471 | } |
ampembeng | 15:6f2798e45099 | 472 | |
ampembeng | 15:6f2798e45099 | 473 | for (i = 0; i < tokenTableIndex; i++) { |
ampembeng | 15:6f2798e45099 | 474 | if (!tokenTable[i].isFree) { |
ampembeng | 15:6f2798e45099 | 475 | if (isJsonKeyMatchingAndUpdateValue(shadowRxBuf, pJsonHandler, tokenCount, (jsonStruct_t *)tokenTable[i].pStruct, |
ampembeng | 15:6f2798e45099 | 476 | &dataLength, &DataPosition)) { |
ampembeng | 15:6f2798e45099 | 477 | if (tokenTable[i].callback != NULL) { |
ampembeng | 15:6f2798e45099 | 478 | tokenTable[i].callback(shadowRxBuf + DataPosition, dataLength, (jsonStruct_t *)tokenTable[i].pStruct); |
ampembeng | 15:6f2798e45099 | 479 | } |
ampembeng | 15:6f2798e45099 | 480 | } |
ampembeng | 15:6f2798e45099 | 481 | } |
ampembeng | 15:6f2798e45099 | 482 | } |
ampembeng | 15:6f2798e45099 | 483 | |
ampembeng | 15:6f2798e45099 | 484 | return NONE_ERROR; |
ampembeng | 15:6f2798e45099 | 485 | } |
ampembeng | 15:6f2798e45099 | 486 |