Demo application for using the AT&T IoT Starter Kit Powered by AWS.

Dependencies:   SDFileSystem

Fork of ATT_AWS_IoT_demo by Anthony Phillips

IoT Starter Kit Powered by AWS Demo

This program demonstrates the AT&T IoT Starter Kit sending data directly into AWS IoT. It's explained and used in the Getting Started with the IoT Starter Kit Powered by AWS on starterkit.att.com.

What's required

  • AT&T IoT LTE Add-on (also known as the Cellular Shield)
  • NXP K64F - for programming
  • microSD card - used to store your AWS security credentials
  • AWS account
  • Python, locally installed

If you don't already have an IoT Starter Kit, you can purchase a kit here. The IoT Starter Kit Powered by AWS includes the LTE cellular shield, K64F, and a microSD card.

Committer:
rfinn
Date:
Tue Feb 07 16:18:57 2017 +0000
Revision:
27:2f486c766854
Parent:
15:6f2798e45099
changed SDFileSystem library

Who changed what in which revision?

UserRevisionLine numberNew contents of line
ampembeng 15:6f2798e45099 1 /*
ampembeng 15:6f2798e45099 2 * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
ampembeng 15:6f2798e45099 3 *
ampembeng 15:6f2798e45099 4 * Licensed under the Apache License, Version 2.0 (the "License").
ampembeng 15:6f2798e45099 5 * You may not use this file except in compliance with the License.
ampembeng 15:6f2798e45099 6 * A copy of the License is located at
ampembeng 15:6f2798e45099 7 *
ampembeng 15:6f2798e45099 8 * http://aws.amazon.com/apache2.0
ampembeng 15:6f2798e45099 9 *
ampembeng 15:6f2798e45099 10 * or in the "license" file accompanying this file. This file is distributed
ampembeng 15:6f2798e45099 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
ampembeng 15:6f2798e45099 12 * express or implied. See the License for the specific language governing
ampembeng 15:6f2798e45099 13 * permissions and limitations under the License.
ampembeng 15:6f2798e45099 14 */
ampembeng 15:6f2798e45099 15
ampembeng 15:6f2798e45099 16 #include "aws_iot_shadow_records.h"
ampembeng 15:6f2798e45099 17
ampembeng 15:6f2798e45099 18 #include <string.h>
ampembeng 15:6f2798e45099 19 #include <stdio.h>
ampembeng 15:6f2798e45099 20
ampembeng 15:6f2798e45099 21 #include "timer_interface.h"
ampembeng 15:6f2798e45099 22 #include "aws_iot_json_utils.h"
ampembeng 15:6f2798e45099 23 #include "aws_iot_log.h"
ampembeng 15:6f2798e45099 24 #include "aws_iot_shadow_json.h"
ampembeng 15:6f2798e45099 25 #include "aws_iot_config.h"
ampembeng 15:6f2798e45099 26
ampembeng 15:6f2798e45099 27 typedef struct {
ampembeng 15:6f2798e45099 28 char clientTokenID[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE];
ampembeng 15:6f2798e45099 29 char thingName[MAX_SIZE_OF_THING_NAME];
ampembeng 15:6f2798e45099 30 ShadowActions_t action;
ampembeng 15:6f2798e45099 31 fpActionCallback_t callback;
ampembeng 15:6f2798e45099 32 void *pCallbackContext;
ampembeng 15:6f2798e45099 33 bool isFree;
ampembeng 15:6f2798e45099 34 Timer timer;
ampembeng 15:6f2798e45099 35 } ToBeReceivedAckRecord_t;
ampembeng 15:6f2798e45099 36
ampembeng 15:6f2798e45099 37 typedef struct {
ampembeng 15:6f2798e45099 38 const char *pKey;
ampembeng 15:6f2798e45099 39 void *pStruct;
ampembeng 15:6f2798e45099 40 jsonStructCallback_t callback;
ampembeng 15:6f2798e45099 41 bool isFree;
ampembeng 15:6f2798e45099 42 } JsonTokenTable_t;
ampembeng 15:6f2798e45099 43
ampembeng 15:6f2798e45099 44 typedef struct {
ampembeng 15:6f2798e45099 45 char Topic[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 46 uint8_t count;
ampembeng 15:6f2798e45099 47 bool isFree;
ampembeng 15:6f2798e45099 48 bool isSticky;
ampembeng 15:6f2798e45099 49 } SubscriptionRecord_t;
ampembeng 15:6f2798e45099 50
ampembeng 15:6f2798e45099 51 typedef enum {
ampembeng 15:6f2798e45099 52 SHADOW_ACCEPTED, SHADOW_REJECTED, SHADOW_ACTION
ampembeng 15:6f2798e45099 53 } ShadowAckTopicTypes_t;
ampembeng 15:6f2798e45099 54
ampembeng 15:6f2798e45099 55 ToBeReceivedAckRecord_t AckWaitList[MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME];
ampembeng 15:6f2798e45099 56
ampembeng 15:6f2798e45099 57 MQTTClient_t *pMqttClient;
ampembeng 15:6f2798e45099 58
ampembeng 15:6f2798e45099 59 char myThingName[MAX_SIZE_OF_THING_NAME];
ampembeng 15:6f2798e45099 60 char mqttClientID[MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES];
ampembeng 15:6f2798e45099 61
ampembeng 15:6f2798e45099 62 char shadowDeltaTopic[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 63
ampembeng 15:6f2798e45099 64 #define MAX_TOPICS_AT_ANY_GIVEN_TIME 2*MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME
ampembeng 15:6f2798e45099 65 SubscriptionRecord_t SubscriptionList[MAX_TOPICS_AT_ANY_GIVEN_TIME];
ampembeng 15:6f2798e45099 66
ampembeng 15:6f2798e45099 67 #define SUBSCRIBE_SETTLING_TIME 2
ampembeng 15:6f2798e45099 68 char shadowRxBuf[SHADOW_MAX_SIZE_OF_RX_BUFFER];
ampembeng 15:6f2798e45099 69
ampembeng 15:6f2798e45099 70 static JsonTokenTable_t tokenTable[MAX_JSON_TOKEN_EXPECTED];
ampembeng 15:6f2798e45099 71 static uint32_t tokenTableIndex = 0;
ampembeng 15:6f2798e45099 72 static bool deltaTopicSubscribedFlag = false;
ampembeng 15:6f2798e45099 73 uint32_t shadowJsonVersionNum = 0;
ampembeng 15:6f2798e45099 74 bool shadowDiscardOldDeltaFlag = true;
ampembeng 15:6f2798e45099 75
ampembeng 15:6f2798e45099 76 // local helper functions
ampembeng 15:6f2798e45099 77 static int AckStatusCallback(MQTTCallbackParams params);
ampembeng 15:6f2798e45099 78 static int shadow_delta_callback(MQTTCallbackParams params);
ampembeng 15:6f2798e45099 79 static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action,
ampembeng 15:6f2798e45099 80 ShadowAckTopicTypes_t ackType);
ampembeng 15:6f2798e45099 81 static int16_t getNextFreeIndexOfSubscriptionList(void);
ampembeng 15:6f2798e45099 82 static void unsubscribeFromAcceptedAndRejected(uint8_t index);
ampembeng 15:6f2798e45099 83
ampembeng 15:6f2798e45099 84 void initDeltaTokens(void) {
ampembeng 15:6f2798e45099 85 uint32_t i;
ampembeng 15:6f2798e45099 86 for (i = 0; i < MAX_JSON_TOKEN_EXPECTED; i++) {
ampembeng 15:6f2798e45099 87 tokenTable[i].isFree = true;
ampembeng 15:6f2798e45099 88 }
ampembeng 15:6f2798e45099 89 tokenTableIndex = 0;
ampembeng 15:6f2798e45099 90 deltaTopicSubscribedFlag = false;
ampembeng 15:6f2798e45099 91 }
ampembeng 15:6f2798e45099 92
ampembeng 15:6f2798e45099 93 IoT_Error_t registerJsonTokenOnDelta(jsonStruct_t *pStruct) {
ampembeng 15:6f2798e45099 94
ampembeng 15:6f2798e45099 95 IoT_Error_t rc = NONE_ERROR;
ampembeng 15:6f2798e45099 96
ampembeng 15:6f2798e45099 97 if (!deltaTopicSubscribedFlag) {
ampembeng 15:6f2798e45099 98 MQTTSubscribeParams subParams;
ampembeng 15:6f2798e45099 99 subParams.mHandler = shadow_delta_callback;
ampembeng 15:6f2798e45099 100 snprintf(shadowDeltaTopic,MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/update/delta", myThingName);
ampembeng 15:6f2798e45099 101 subParams.pTopic = shadowDeltaTopic;
ampembeng 15:6f2798e45099 102 subParams.qos = QOS_0;
ampembeng 15:6f2798e45099 103 rc = pMqttClient->subscribe(&subParams);
ampembeng 15:6f2798e45099 104 DEBUG("delta topic %s", shadowDeltaTopic);
ampembeng 15:6f2798e45099 105 deltaTopicSubscribedFlag = true;
ampembeng 15:6f2798e45099 106 }
ampembeng 15:6f2798e45099 107
ampembeng 15:6f2798e45099 108 if (tokenTableIndex >= MAX_JSON_TOKEN_EXPECTED) {
ampembeng 15:6f2798e45099 109 return GENERIC_ERROR;
ampembeng 15:6f2798e45099 110 }
ampembeng 15:6f2798e45099 111
ampembeng 15:6f2798e45099 112 tokenTable[tokenTableIndex].pKey = pStruct->pKey;
ampembeng 15:6f2798e45099 113 tokenTable[tokenTableIndex].callback = pStruct->cb;
ampembeng 15:6f2798e45099 114 tokenTable[tokenTableIndex].pStruct = pStruct;
ampembeng 15:6f2798e45099 115 tokenTable[tokenTableIndex].isFree = false;
ampembeng 15:6f2798e45099 116 tokenTableIndex++;
ampembeng 15:6f2798e45099 117
ampembeng 15:6f2798e45099 118 return rc;
ampembeng 15:6f2798e45099 119 }
ampembeng 15:6f2798e45099 120
ampembeng 15:6f2798e45099 121 static int16_t getNextFreeIndexOfSubscriptionList(void) {
ampembeng 15:6f2798e45099 122 uint8_t i;
ampembeng 15:6f2798e45099 123 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 124 if (SubscriptionList[i].isFree) {
ampembeng 15:6f2798e45099 125 SubscriptionList[i].isFree = false;
ampembeng 15:6f2798e45099 126 return i;
ampembeng 15:6f2798e45099 127 }
ampembeng 15:6f2798e45099 128 }
ampembeng 15:6f2798e45099 129 return -1;
ampembeng 15:6f2798e45099 130 }
ampembeng 15:6f2798e45099 131
ampembeng 15:6f2798e45099 132 static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action,
ampembeng 15:6f2798e45099 133 ShadowAckTopicTypes_t ackType) {
ampembeng 15:6f2798e45099 134
ampembeng 15:6f2798e45099 135 char actionBuf[10];
ampembeng 15:6f2798e45099 136 char ackTypeBuf[10];
ampembeng 15:6f2798e45099 137
ampembeng 15:6f2798e45099 138 if (action == SHADOW_GET) {
ampembeng 15:6f2798e45099 139 strcpy(actionBuf, "get");
ampembeng 15:6f2798e45099 140 } else if (action == SHADOW_UPDATE) {
ampembeng 15:6f2798e45099 141 strcpy(actionBuf, "update");
ampembeng 15:6f2798e45099 142 } else if (action == SHADOW_DELETE) {
ampembeng 15:6f2798e45099 143 strcpy(actionBuf, "delete");
ampembeng 15:6f2798e45099 144 }
ampembeng 15:6f2798e45099 145
ampembeng 15:6f2798e45099 146 if (ackType == SHADOW_ACCEPTED) {
ampembeng 15:6f2798e45099 147 strcpy(ackTypeBuf, "accepted");
ampembeng 15:6f2798e45099 148 } else if (ackType == SHADOW_REJECTED) {
ampembeng 15:6f2798e45099 149 strcpy(ackTypeBuf, "rejected");
ampembeng 15:6f2798e45099 150 }
ampembeng 15:6f2798e45099 151
ampembeng 15:6f2798e45099 152 if (ackType == SHADOW_ACTION) {
ampembeng 15:6f2798e45099 153 sprintf(pTopic, "$aws/things/%s/shadow/%s", pThingName, actionBuf);
ampembeng 15:6f2798e45099 154 } else {
ampembeng 15:6f2798e45099 155 sprintf(pTopic, "$aws/things/%s/shadow/%s/%s", pThingName, actionBuf, ackTypeBuf);
ampembeng 15:6f2798e45099 156 }
ampembeng 15:6f2798e45099 157 }
ampembeng 15:6f2798e45099 158
ampembeng 15:6f2798e45099 159 static bool isAckForMyThingName(const char *pTopicName) {
ampembeng 15:6f2798e45099 160 if (strstr(pTopicName, myThingName) != NULL && ((strstr(pTopicName, "get/accepted") != NULL) || (strstr(pTopicName, "delta") != NULL))) {
ampembeng 15:6f2798e45099 161 return true;
ampembeng 15:6f2798e45099 162 }
ampembeng 15:6f2798e45099 163 return false;
ampembeng 15:6f2798e45099 164 }
ampembeng 15:6f2798e45099 165
ampembeng 15:6f2798e45099 166 static int AckStatusCallback(MQTTCallbackParams params) {
ampembeng 15:6f2798e45099 167 int32_t tokenCount;
ampembeng 15:6f2798e45099 168 int32_t i;
ampembeng 15:6f2798e45099 169 void *pJsonHandler;
ampembeng 15:6f2798e45099 170 char temporaryClientToken[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE];
ampembeng 15:6f2798e45099 171
ampembeng 15:6f2798e45099 172 if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) {
ampembeng 15:6f2798e45099 173 return GENERIC_ERROR;
ampembeng 15:6f2798e45099 174 }
ampembeng 15:6f2798e45099 175
ampembeng 15:6f2798e45099 176 memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen);
ampembeng 15:6f2798e45099 177 shadowRxBuf[params.MessageParams.PayloadLen] = '\0'; // jsmn_parse relies on a string
ampembeng 15:6f2798e45099 178
ampembeng 15:6f2798e45099 179 if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) {
ampembeng 15:6f2798e45099 180 WARN("Received JSON is not valid");
ampembeng 15:6f2798e45099 181 return GENERIC_ERROR;
ampembeng 15:6f2798e45099 182 }
ampembeng 15:6f2798e45099 183
ampembeng 15:6f2798e45099 184 if (isAckForMyThingName(params.pTopicName)) {
ampembeng 15:6f2798e45099 185 uint32_t tempVersionNumber = 0;
ampembeng 15:6f2798e45099 186 if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) {
ampembeng 15:6f2798e45099 187 if (tempVersionNumber > shadowJsonVersionNum) {
ampembeng 15:6f2798e45099 188 shadowJsonVersionNum = tempVersionNumber;
ampembeng 15:6f2798e45099 189 }
ampembeng 15:6f2798e45099 190 }
ampembeng 15:6f2798e45099 191 }
ampembeng 15:6f2798e45099 192
ampembeng 15:6f2798e45099 193 if (extractClientToken(shadowRxBuf, temporaryClientToken)) {
ampembeng 15:6f2798e45099 194 for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 195 if (!AckWaitList[i].isFree) {
ampembeng 15:6f2798e45099 196 if (strcmp(AckWaitList[i].clientTokenID, temporaryClientToken) == 0) {
ampembeng 15:6f2798e45099 197 Shadow_Ack_Status_t status;
ampembeng 15:6f2798e45099 198 if (strstr(params.pTopicName, "accepted") != NULL) {
ampembeng 15:6f2798e45099 199 status = SHADOW_ACK_ACCEPTED;
ampembeng 15:6f2798e45099 200 } else if (strstr(params.pTopicName, "rejected") != NULL) {
ampembeng 15:6f2798e45099 201 status = SHADOW_ACK_REJECTED;
ampembeng 15:6f2798e45099 202 }
ampembeng 15:6f2798e45099 203 if (status == SHADOW_ACK_ACCEPTED || status == SHADOW_ACK_REJECTED) {
ampembeng 15:6f2798e45099 204 if (AckWaitList[i].callback != NULL) {
ampembeng 15:6f2798e45099 205 AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, status,
ampembeng 15:6f2798e45099 206 shadowRxBuf, AckWaitList[i].pCallbackContext);
ampembeng 15:6f2798e45099 207 }
ampembeng 15:6f2798e45099 208 unsubscribeFromAcceptedAndRejected(i);
ampembeng 15:6f2798e45099 209 AckWaitList[i].isFree = true;
ampembeng 15:6f2798e45099 210 return NONE_ERROR;
ampembeng 15:6f2798e45099 211 }
ampembeng 15:6f2798e45099 212 }
ampembeng 15:6f2798e45099 213 }
ampembeng 15:6f2798e45099 214 }
ampembeng 15:6f2798e45099 215 }
ampembeng 15:6f2798e45099 216
ampembeng 15:6f2798e45099 217 return GENERIC_ERROR;
ampembeng 15:6f2798e45099 218 }
ampembeng 15:6f2798e45099 219
ampembeng 15:6f2798e45099 220 static int16_t findIndexOfSubscriptionList(const char *pTopic) {
ampembeng 15:6f2798e45099 221 uint8_t i;
ampembeng 15:6f2798e45099 222 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 223 if (!SubscriptionList[i].isFree) {
ampembeng 15:6f2798e45099 224 if ((strcmp(pTopic, SubscriptionList[i].Topic) == 0)) {
ampembeng 15:6f2798e45099 225 return i;
ampembeng 15:6f2798e45099 226 }
ampembeng 15:6f2798e45099 227 }
ampembeng 15:6f2798e45099 228 }
ampembeng 15:6f2798e45099 229 return -1;
ampembeng 15:6f2798e45099 230 }
ampembeng 15:6f2798e45099 231
ampembeng 15:6f2798e45099 232 static void unsubscribeFromAcceptedAndRejected(uint8_t index) {
ampembeng 15:6f2798e45099 233
ampembeng 15:6f2798e45099 234 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 235 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 236 IoT_Error_t ret_val = NONE_ERROR;
ampembeng 15:6f2798e45099 237
ampembeng 15:6f2798e45099 238 topicNameFromThingAndAction(TemporaryTopicNameAccepted, AckWaitList[index].thingName, AckWaitList[index].action,
ampembeng 15:6f2798e45099 239 SHADOW_ACCEPTED);
ampembeng 15:6f2798e45099 240 topicNameFromThingAndAction(TemporaryTopicNameRejected, AckWaitList[index].thingName, AckWaitList[index].action,
ampembeng 15:6f2798e45099 241 SHADOW_REJECTED);
ampembeng 15:6f2798e45099 242
ampembeng 15:6f2798e45099 243 int16_t indexSubList;
ampembeng 15:6f2798e45099 244
ampembeng 15:6f2798e45099 245 indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameAccepted);
ampembeng 15:6f2798e45099 246 if ((indexSubList >= 0)) {
ampembeng 15:6f2798e45099 247 if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) {
ampembeng 15:6f2798e45099 248 ret_val = pMqttClient->unsubscribe(TemporaryTopicNameAccepted);
ampembeng 15:6f2798e45099 249 if (ret_val == NONE_ERROR) {
ampembeng 15:6f2798e45099 250 SubscriptionList[indexSubList].isFree = true;
ampembeng 15:6f2798e45099 251 }
ampembeng 15:6f2798e45099 252 } else if (SubscriptionList[indexSubList].count > 1) {
ampembeng 15:6f2798e45099 253 SubscriptionList[indexSubList].count--;
ampembeng 15:6f2798e45099 254 }
ampembeng 15:6f2798e45099 255 }
ampembeng 15:6f2798e45099 256
ampembeng 15:6f2798e45099 257 indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameRejected);
ampembeng 15:6f2798e45099 258 if ((indexSubList >= 0)) {
ampembeng 15:6f2798e45099 259 if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) {
ampembeng 15:6f2798e45099 260 ret_val = pMqttClient->unsubscribe(TemporaryTopicNameRejected);
ampembeng 15:6f2798e45099 261 if (ret_val == NONE_ERROR) {
ampembeng 15:6f2798e45099 262 SubscriptionList[indexSubList].isFree = true;
ampembeng 15:6f2798e45099 263 }
ampembeng 15:6f2798e45099 264 } else if (SubscriptionList[indexSubList].count > 1) {
ampembeng 15:6f2798e45099 265 SubscriptionList[indexSubList].count--;
ampembeng 15:6f2798e45099 266 }
ampembeng 15:6f2798e45099 267 }
ampembeng 15:6f2798e45099 268 }
ampembeng 15:6f2798e45099 269
ampembeng 15:6f2798e45099 270 void initializeRecords(MQTTClient_t *pClient) {
ampembeng 15:6f2798e45099 271 uint8_t i;
ampembeng 15:6f2798e45099 272 for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 273 AckWaitList[i].isFree = true;
ampembeng 15:6f2798e45099 274 }
ampembeng 15:6f2798e45099 275 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 276 SubscriptionList[i].isFree = true;
ampembeng 15:6f2798e45099 277 SubscriptionList[i].count = 0;
ampembeng 15:6f2798e45099 278 SubscriptionList[i].isSticky = false;
ampembeng 15:6f2798e45099 279 }
ampembeng 15:6f2798e45099 280 pMqttClient = pClient;
ampembeng 15:6f2798e45099 281 }
ampembeng 15:6f2798e45099 282
ampembeng 15:6f2798e45099 283 bool isSubscriptionPresent(const char *pThingName, ShadowActions_t action) {
ampembeng 15:6f2798e45099 284
ampembeng 15:6f2798e45099 285 uint8_t i = 0;
ampembeng 15:6f2798e45099 286 bool isAcceptedPresent = false;
ampembeng 15:6f2798e45099 287 bool isRejectedPresent = false;
ampembeng 15:6f2798e45099 288 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 289 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 290
ampembeng 15:6f2798e45099 291 topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED);
ampembeng 15:6f2798e45099 292 topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED);
ampembeng 15:6f2798e45099 293
ampembeng 15:6f2798e45099 294 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 295 if (!SubscriptionList[i].isFree) {
ampembeng 15:6f2798e45099 296 if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)) {
ampembeng 15:6f2798e45099 297 isAcceptedPresent = true;
ampembeng 15:6f2798e45099 298 } else if ((strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) {
ampembeng 15:6f2798e45099 299 isRejectedPresent = true;
ampembeng 15:6f2798e45099 300 }
ampembeng 15:6f2798e45099 301 }
ampembeng 15:6f2798e45099 302 }
ampembeng 15:6f2798e45099 303
ampembeng 15:6f2798e45099 304 if (isRejectedPresent && isAcceptedPresent) {
ampembeng 15:6f2798e45099 305 return true;
ampembeng 15:6f2798e45099 306 }
ampembeng 15:6f2798e45099 307
ampembeng 15:6f2798e45099 308 return false;
ampembeng 15:6f2798e45099 309 }
ampembeng 15:6f2798e45099 310
ampembeng 15:6f2798e45099 311 IoT_Error_t subscribeToShadowActionAcks(const char *pThingName, ShadowActions_t action, bool isSticky) {
ampembeng 15:6f2798e45099 312 IoT_Error_t ret_val = NONE_ERROR;
ampembeng 15:6f2798e45099 313 MQTTSubscribeParams subParams = MQTTSubscribeParamsDefault;
ampembeng 15:6f2798e45099 314
ampembeng 15:6f2798e45099 315 bool clearBothEntriesFromList = true;
ampembeng 15:6f2798e45099 316 int16_t indexAcceptedSubList = 0;
ampembeng 15:6f2798e45099 317 int16_t indexRejectedSubList = 0;
ampembeng 15:6f2798e45099 318 indexAcceptedSubList = getNextFreeIndexOfSubscriptionList();
ampembeng 15:6f2798e45099 319 indexRejectedSubList = getNextFreeIndexOfSubscriptionList();
ampembeng 15:6f2798e45099 320
ampembeng 15:6f2798e45099 321 if (indexAcceptedSubList >= 0 && indexRejectedSubList >= 0) {
ampembeng 15:6f2798e45099 322 topicNameFromThingAndAction(SubscriptionList[indexAcceptedSubList].Topic, pThingName, action, SHADOW_ACCEPTED);
ampembeng 15:6f2798e45099 323 subParams.mHandler = AckStatusCallback;
ampembeng 15:6f2798e45099 324 subParams.qos = QOS_0;
ampembeng 15:6f2798e45099 325 subParams.pTopic = SubscriptionList[indexAcceptedSubList].Topic;
ampembeng 15:6f2798e45099 326 ret_val = pMqttClient->subscribe(&subParams);
ampembeng 15:6f2798e45099 327 if (ret_val == NONE_ERROR) {
ampembeng 15:6f2798e45099 328 SubscriptionList[indexAcceptedSubList].count = 1;
ampembeng 15:6f2798e45099 329 SubscriptionList[indexAcceptedSubList].isSticky = isSticky;
ampembeng 15:6f2798e45099 330 topicNameFromThingAndAction(SubscriptionList[indexRejectedSubList].Topic, pThingName, action,
ampembeng 15:6f2798e45099 331 SHADOW_REJECTED);
ampembeng 15:6f2798e45099 332 subParams.pTopic = SubscriptionList[indexRejectedSubList].Topic;
ampembeng 15:6f2798e45099 333 ret_val = pMqttClient->subscribe(&subParams);
ampembeng 15:6f2798e45099 334 if (ret_val == NONE_ERROR) {
ampembeng 15:6f2798e45099 335 SubscriptionList[indexRejectedSubList].count = 1;
ampembeng 15:6f2798e45099 336 SubscriptionList[indexRejectedSubList].isSticky = isSticky;
ampembeng 15:6f2798e45099 337 clearBothEntriesFromList = false;
ampembeng 15:6f2798e45099 338
ampembeng 15:6f2798e45099 339 // wait for SUBSCRIBE_SETTLING_TIME seconds to let the subscription take effect
ampembeng 15:6f2798e45099 340 Timer subSettlingtimer;
ampembeng 15:6f2798e45099 341 InitTimer(&subSettlingtimer);
ampembeng 15:6f2798e45099 342 countdown(&subSettlingtimer, SUBSCRIBE_SETTLING_TIME);
ampembeng 15:6f2798e45099 343 while(!expired(&subSettlingtimer));
ampembeng 15:6f2798e45099 344
ampembeng 15:6f2798e45099 345 }
ampembeng 15:6f2798e45099 346 }
ampembeng 15:6f2798e45099 347 }
ampembeng 15:6f2798e45099 348
ampembeng 15:6f2798e45099 349 if (clearBothEntriesFromList) {
ampembeng 15:6f2798e45099 350 if (indexAcceptedSubList >= 0) {
ampembeng 15:6f2798e45099 351 SubscriptionList[indexAcceptedSubList].isFree = true;
ampembeng 15:6f2798e45099 352 } else if (indexRejectedSubList >= 0) {
ampembeng 15:6f2798e45099 353 SubscriptionList[indexRejectedSubList].isFree = true;
ampembeng 15:6f2798e45099 354 }
ampembeng 15:6f2798e45099 355 if (SubscriptionList[indexAcceptedSubList].count == 1) {
ampembeng 15:6f2798e45099 356 pMqttClient->unsubscribe(SubscriptionList[indexAcceptedSubList].Topic);
ampembeng 15:6f2798e45099 357 }
ampembeng 15:6f2798e45099 358 }
ampembeng 15:6f2798e45099 359
ampembeng 15:6f2798e45099 360 return ret_val;
ampembeng 15:6f2798e45099 361 }
ampembeng 15:6f2798e45099 362
ampembeng 15:6f2798e45099 363 void incrementSubscriptionCnt(const char *pThingName, ShadowActions_t action, bool isSticky) {
ampembeng 15:6f2798e45099 364 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 365 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 366 uint8_t i;
ampembeng 15:6f2798e45099 367 topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED);
ampembeng 15:6f2798e45099 368 topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED);
ampembeng 15:6f2798e45099 369
ampembeng 15:6f2798e45099 370 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 371 if (!SubscriptionList[i].isFree) {
ampembeng 15:6f2798e45099 372 if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)
ampembeng 15:6f2798e45099 373 || (strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) {
ampembeng 15:6f2798e45099 374 SubscriptionList[i].count++;
ampembeng 15:6f2798e45099 375 SubscriptionList[i].isSticky = isSticky;
ampembeng 15:6f2798e45099 376 }
ampembeng 15:6f2798e45099 377 }
ampembeng 15:6f2798e45099 378 }
ampembeng 15:6f2798e45099 379 }
ampembeng 15:6f2798e45099 380
ampembeng 15:6f2798e45099 381 IoT_Error_t publishToShadowAction(const char * pThingName, ShadowActions_t action, const char *pJsonDocumentToBeSent) {
ampembeng 15:6f2798e45099 382 IoT_Error_t ret_val = NONE_ERROR;
ampembeng 15:6f2798e45099 383 char TemporaryTopicName[MAX_SHADOW_TOPIC_LENGTH_BYTES];
ampembeng 15:6f2798e45099 384 topicNameFromThingAndAction(TemporaryTopicName, pThingName, action, SHADOW_ACTION);
ampembeng 15:6f2798e45099 385
ampembeng 15:6f2798e45099 386 MQTTPublishParams pubParams = MQTTPublishParamsDefault;
ampembeng 15:6f2798e45099 387 pubParams.pTopic = TemporaryTopicName;
ampembeng 15:6f2798e45099 388 MQTTMessageParams msgParams = MQTTMessageParamsDefault;
ampembeng 15:6f2798e45099 389 msgParams.qos = QOS_0;
ampembeng 15:6f2798e45099 390 msgParams.PayloadLen = strlen(pJsonDocumentToBeSent) + 1;
ampembeng 15:6f2798e45099 391 msgParams.pPayload = (char *) pJsonDocumentToBeSent;
ampembeng 15:6f2798e45099 392 pubParams.MessageParams = msgParams;
ampembeng 15:6f2798e45099 393 ret_val = pMqttClient->publish(&pubParams);
ampembeng 15:6f2798e45099 394
ampembeng 15:6f2798e45099 395 return ret_val;
ampembeng 15:6f2798e45099 396 }
ampembeng 15:6f2798e45099 397
ampembeng 15:6f2798e45099 398 bool getNextFreeIndexOfAckWaitList(uint8_t *pIndex) {
ampembeng 15:6f2798e45099 399 uint8_t i;
ampembeng 15:6f2798e45099 400 if (pIndex != NULL) {
ampembeng 15:6f2798e45099 401 for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 402 if (AckWaitList[i].isFree) {
ampembeng 15:6f2798e45099 403 *pIndex = i;
ampembeng 15:6f2798e45099 404 return true;
ampembeng 15:6f2798e45099 405 }
ampembeng 15:6f2798e45099 406 }
ampembeng 15:6f2798e45099 407 }
ampembeng 15:6f2798e45099 408 return false;
ampembeng 15:6f2798e45099 409 }
ampembeng 15:6f2798e45099 410
ampembeng 15:6f2798e45099 411 void addToAckWaitList(uint8_t indexAckWaitList, const char *pThingName, ShadowActions_t action,
ampembeng 15:6f2798e45099 412 const char *pExtractedClientToken, fpActionCallback_t callback, void *pCallbackContext,
ampembeng 15:6f2798e45099 413 uint32_t timeout_seconds) {
ampembeng 15:6f2798e45099 414 AckWaitList[indexAckWaitList].callback = callback;
ampembeng 15:6f2798e45099 415 strncpy(AckWaitList[indexAckWaitList].clientTokenID, pExtractedClientToken, MAX_SIZE_CLIENT_ID_WITH_SEQUENCE);
ampembeng 15:6f2798e45099 416 strncpy(AckWaitList[indexAckWaitList].thingName, pThingName, MAX_SIZE_OF_THING_NAME);
ampembeng 15:6f2798e45099 417 AckWaitList[indexAckWaitList].pCallbackContext = pCallbackContext;
ampembeng 15:6f2798e45099 418 AckWaitList[indexAckWaitList].action = action;
ampembeng 15:6f2798e45099 419 InitTimer(&(AckWaitList[indexAckWaitList].timer));
ampembeng 15:6f2798e45099 420 countdown(&(AckWaitList[indexAckWaitList].timer), timeout_seconds);
ampembeng 15:6f2798e45099 421 AckWaitList[indexAckWaitList].isFree = false;
ampembeng 15:6f2798e45099 422 }
ampembeng 15:6f2798e45099 423
ampembeng 15:6f2798e45099 424 void HandleExpiredResponseCallbacks(void) {
ampembeng 15:6f2798e45099 425 uint8_t i;
ampembeng 15:6f2798e45099 426 for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
ampembeng 15:6f2798e45099 427 if (!AckWaitList[i].isFree) {
ampembeng 15:6f2798e45099 428 if (expired(&(AckWaitList[i].timer))) {
ampembeng 15:6f2798e45099 429 if (AckWaitList[i].callback != NULL) {
ampembeng 15:6f2798e45099 430 AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, SHADOW_ACK_TIMEOUT,
ampembeng 15:6f2798e45099 431 shadowRxBuf, AckWaitList[i].pCallbackContext);
ampembeng 15:6f2798e45099 432 }
ampembeng 15:6f2798e45099 433 AckWaitList[i].isFree = true;
ampembeng 15:6f2798e45099 434 unsubscribeFromAcceptedAndRejected(i);
ampembeng 15:6f2798e45099 435 }
ampembeng 15:6f2798e45099 436 }
ampembeng 15:6f2798e45099 437 }
ampembeng 15:6f2798e45099 438 }
ampembeng 15:6f2798e45099 439
ampembeng 15:6f2798e45099 440 static int shadow_delta_callback(MQTTCallbackParams params) {
ampembeng 15:6f2798e45099 441
ampembeng 15:6f2798e45099 442 int32_t tokenCount;
ampembeng 15:6f2798e45099 443 uint32_t i = 0;
ampembeng 15:6f2798e45099 444 void *pJsonHandler;
ampembeng 15:6f2798e45099 445 int32_t DataPosition;
ampembeng 15:6f2798e45099 446 uint32_t dataLength;
ampembeng 15:6f2798e45099 447
ampembeng 15:6f2798e45099 448 if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) {
ampembeng 15:6f2798e45099 449 return GENERIC_ERROR;
ampembeng 15:6f2798e45099 450 }
ampembeng 15:6f2798e45099 451
ampembeng 15:6f2798e45099 452 memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen);
ampembeng 15:6f2798e45099 453 shadowRxBuf[params.MessageParams.PayloadLen] = '\0'; // jsmn_parse relies on a string
ampembeng 15:6f2798e45099 454
ampembeng 15:6f2798e45099 455 if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) {
ampembeng 15:6f2798e45099 456 WARN("Received JSON is not valid");
ampembeng 15:6f2798e45099 457 return GENERIC_ERROR;
ampembeng 15:6f2798e45099 458 }
ampembeng 15:6f2798e45099 459
ampembeng 15:6f2798e45099 460 if (shadowDiscardOldDeltaFlag) {
ampembeng 15:6f2798e45099 461 uint32_t tempVersionNumber = 0;
ampembeng 15:6f2798e45099 462 if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) {
ampembeng 15:6f2798e45099 463 if (tempVersionNumber > shadowJsonVersionNum) {
ampembeng 15:6f2798e45099 464 shadowJsonVersionNum = tempVersionNumber;
ampembeng 15:6f2798e45099 465 DEBUG("New Version number: %d", shadowJsonVersionNum);
ampembeng 15:6f2798e45099 466 } else {
ampembeng 15:6f2798e45099 467 WARN("Old Delta Message received - Ignoring rx: %d local: %d", tempVersionNumber, shadowJsonVersionNum);
ampembeng 15:6f2798e45099 468 return GENERIC_ERROR;
ampembeng 15:6f2798e45099 469 }
ampembeng 15:6f2798e45099 470 }
ampembeng 15:6f2798e45099 471 }
ampembeng 15:6f2798e45099 472
ampembeng 15:6f2798e45099 473 for (i = 0; i < tokenTableIndex; i++) {
ampembeng 15:6f2798e45099 474 if (!tokenTable[i].isFree) {
ampembeng 15:6f2798e45099 475 if (isJsonKeyMatchingAndUpdateValue(shadowRxBuf, pJsonHandler, tokenCount, (jsonStruct_t *)tokenTable[i].pStruct,
ampembeng 15:6f2798e45099 476 &dataLength, &DataPosition)) {
ampembeng 15:6f2798e45099 477 if (tokenTable[i].callback != NULL) {
ampembeng 15:6f2798e45099 478 tokenTable[i].callback(shadowRxBuf + DataPosition, dataLength, (jsonStruct_t *)tokenTable[i].pStruct);
ampembeng 15:6f2798e45099 479 }
ampembeng 15:6f2798e45099 480 }
ampembeng 15:6f2798e45099 481 }
ampembeng 15:6f2798e45099 482 }
ampembeng 15:6f2798e45099 483
ampembeng 15:6f2798e45099 484 return NONE_ERROR;
ampembeng 15:6f2798e45099 485 }
ampembeng 15:6f2798e45099 486