V.06 11/3

Dependencies:   FT6206 SDFileSystem SPI_TFT_ILI9341 TFT_fonts

Fork of ATT_AWS_IoT_demo by attiot

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers aws_iot_shadow_records.cpp Source File

aws_iot_shadow_records.cpp

00001 /*
00002  * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License").
00005  * You may not use this file except in compliance with the License.
00006  * A copy of the License is located at
00007  *
00008  *  http://aws.amazon.com/apache2.0
00009  *
00010  * or in the "license" file accompanying this file. This file is distributed
00011  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
00012  * express or implied. See the License for the specific language governing
00013  * permissions and limitations under the License.
00014  */
00015 
00016 #include "aws_iot_shadow_records.h"
00017 
00018 #include <string.h>
00019 #include <stdio.h>
00020 
00021 #include "timer_interface.h"
00022 #include "aws_iot_json_utils.h"
00023 #include "aws_iot_log.h"
00024 #include "aws_iot_shadow_json.h"
00025 #include "aws_iot_config.h"
00026 
00027 typedef struct {
00028     char clientTokenID[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE];
00029     char thingName[MAX_SIZE_OF_THING_NAME];
00030     ShadowActions_t action;
00031     fpActionCallback_t callback;
00032     void *pCallbackContext;
00033     bool isFree;
00034     Timer timer;
00035 } ToBeReceivedAckRecord_t;
00036 
00037 typedef struct {
00038     const char *pKey;
00039     void *pStruct;
00040     jsonStructCallback_t callback;
00041     bool isFree;
00042 } JsonTokenTable_t;
00043 
00044 typedef struct {
00045     char Topic[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00046     uint8_t count;
00047     bool isFree;
00048     bool isSticky;
00049 } SubscriptionRecord_t;
00050 
00051 typedef enum {
00052     SHADOW_ACCEPTED, SHADOW_REJECTED, SHADOW_ACTION
00053 } ShadowAckTopicTypes_t;
00054 
00055 ToBeReceivedAckRecord_t AckWaitList[MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME];
00056 
00057 MQTTClient_t *pMqttClient;
00058 
00059 char myThingName[MAX_SIZE_OF_THING_NAME];
00060 char mqttClientID[MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES];
00061 
00062 char shadowDeltaTopic[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00063 
00064 #define MAX_TOPICS_AT_ANY_GIVEN_TIME 2*MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME
00065 SubscriptionRecord_t SubscriptionList[MAX_TOPICS_AT_ANY_GIVEN_TIME];
00066 
00067 #define SUBSCRIBE_SETTLING_TIME 2
00068 char shadowRxBuf[SHADOW_MAX_SIZE_OF_RX_BUFFER];
00069 
00070 static JsonTokenTable_t tokenTable[MAX_JSON_TOKEN_EXPECTED];
00071 static uint32_t tokenTableIndex = 0;
00072 static bool deltaTopicSubscribedFlag = false;
00073 uint32_t shadowJsonVersionNum = 0;
00074 bool shadowDiscardOldDeltaFlag = true;
00075 
00076 // local helper functions
00077 static int AckStatusCallback(MQTTCallbackParams params);
00078 static int shadow_delta_callback(MQTTCallbackParams params);
00079 static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action,
00080         ShadowAckTopicTypes_t ackType);
00081 static int16_t getNextFreeIndexOfSubscriptionList(void);
00082 static void unsubscribeFromAcceptedAndRejected(uint8_t index);
00083 
00084 void initDeltaTokens(void) {
00085     uint32_t i;
00086     for (i = 0; i < MAX_JSON_TOKEN_EXPECTED; i++) {
00087         tokenTable[i].isFree = true;
00088     }
00089     tokenTableIndex = 0;
00090     deltaTopicSubscribedFlag = false;
00091 }
00092 
00093 IoT_Error_t registerJsonTokenOnDelta(jsonStruct_t *pStruct) {
00094 
00095     IoT_Error_t rc = NONE_ERROR;
00096 
00097     if (!deltaTopicSubscribedFlag) {
00098         MQTTSubscribeParams subParams;
00099         subParams.mHandler = shadow_delta_callback;
00100         snprintf(shadowDeltaTopic,MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/update/delta", myThingName);
00101         subParams.pTopic = shadowDeltaTopic;
00102         subParams.qos = QOS_0;
00103         rc = pMqttClient->subscribe(&subParams);
00104         DEBUG("delta topic %s", shadowDeltaTopic);
00105         deltaTopicSubscribedFlag = true;
00106     }
00107 
00108     if (tokenTableIndex >= MAX_JSON_TOKEN_EXPECTED) {
00109         return GENERIC_ERROR;
00110     }
00111 
00112     tokenTable[tokenTableIndex].pKey = pStruct->pKey;
00113     tokenTable[tokenTableIndex].callback = pStruct->cb;
00114     tokenTable[tokenTableIndex].pStruct = pStruct;
00115     tokenTable[tokenTableIndex].isFree = false;
00116     tokenTableIndex++;
00117 
00118     return rc;
00119 }
00120 
00121 static int16_t getNextFreeIndexOfSubscriptionList(void) {
00122     uint8_t i;
00123     for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
00124         if (SubscriptionList[i].isFree) {
00125             SubscriptionList[i].isFree = false;
00126             return i;
00127         }
00128     }
00129     return -1;
00130 }
00131 
00132 static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action,
00133         ShadowAckTopicTypes_t ackType) {
00134 
00135     char actionBuf[10];
00136     char ackTypeBuf[10];
00137 
00138     if (action == SHADOW_GET) {
00139         strcpy(actionBuf, "get");
00140     } else if (action == SHADOW_UPDATE) {
00141         strcpy(actionBuf, "update");
00142     } else if (action == SHADOW_DELETE) {
00143         strcpy(actionBuf, "delete");
00144     }
00145 
00146     if (ackType == SHADOW_ACCEPTED) {
00147         strcpy(ackTypeBuf, "accepted");
00148     } else if (ackType == SHADOW_REJECTED) {
00149         strcpy(ackTypeBuf, "rejected");
00150     }
00151 
00152     if (ackType == SHADOW_ACTION) {
00153         sprintf(pTopic, "$aws/things/%s/shadow/%s", pThingName, actionBuf);
00154     } else {
00155         sprintf(pTopic, "$aws/things/%s/shadow/%s/%s", pThingName, actionBuf, ackTypeBuf);
00156     }
00157 }
00158 
00159 static bool isAckForMyThingName(const char *pTopicName) {
00160     if (strstr(pTopicName, myThingName) != NULL && ((strstr(pTopicName, "get/accepted") != NULL) || (strstr(pTopicName, "delta") != NULL))) {
00161         return true;
00162     }
00163     return false;
00164 }
00165 
00166 static int AckStatusCallback(MQTTCallbackParams params) {
00167     int32_t tokenCount;
00168     int32_t i;
00169     void *pJsonHandler;
00170     char temporaryClientToken[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE];
00171 
00172     if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) {
00173         return GENERIC_ERROR;
00174     }
00175 
00176     memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen);
00177     shadowRxBuf[params.MessageParams.PayloadLen] = '\0';    // jsmn_parse relies on a string
00178 
00179     if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) {
00180         WARN("Received JSON is not valid");
00181         return GENERIC_ERROR;
00182     }
00183 
00184     if (isAckForMyThingName(params.pTopicName)) {
00185         uint32_t tempVersionNumber = 0;
00186         if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) {
00187             if (tempVersionNumber > shadowJsonVersionNum) {
00188                 shadowJsonVersionNum = tempVersionNumber;
00189             }
00190         }
00191     }
00192 
00193     if (extractClientToken(shadowRxBuf, temporaryClientToken)) {
00194         for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
00195             if (!AckWaitList[i].isFree) {
00196                 if (strcmp(AckWaitList[i].clientTokenID, temporaryClientToken) == 0) {
00197                     Shadow_Ack_Status_t status;
00198                     if (strstr(params.pTopicName, "accepted") != NULL) {
00199                         status = SHADOW_ACK_ACCEPTED;
00200                     } else if (strstr(params.pTopicName, "rejected") != NULL) {
00201                         status = SHADOW_ACK_REJECTED;
00202                     }
00203                     if (status == SHADOW_ACK_ACCEPTED || status == SHADOW_ACK_REJECTED) {
00204                         if (AckWaitList[i].callback != NULL) {
00205                             AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, status,
00206                                     shadowRxBuf, AckWaitList[i].pCallbackContext);
00207                         }
00208                         unsubscribeFromAcceptedAndRejected(i);
00209                         AckWaitList[i].isFree = true;
00210                         return NONE_ERROR;
00211                     }
00212                 }
00213             }
00214         }
00215     }
00216 
00217     return GENERIC_ERROR;
00218 }
00219 
00220 static int16_t findIndexOfSubscriptionList(const char *pTopic) {
00221     uint8_t i;
00222     for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
00223         if (!SubscriptionList[i].isFree) {
00224             if ((strcmp(pTopic, SubscriptionList[i].Topic) == 0)) {
00225                 return i;
00226             }
00227         }
00228     }
00229     return -1;
00230 }
00231 
00232 static void unsubscribeFromAcceptedAndRejected(uint8_t index) {
00233 
00234     char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00235     char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00236     IoT_Error_t ret_val = NONE_ERROR;
00237 
00238     topicNameFromThingAndAction(TemporaryTopicNameAccepted, AckWaitList[index].thingName, AckWaitList[index].action,
00239             SHADOW_ACCEPTED);
00240     topicNameFromThingAndAction(TemporaryTopicNameRejected, AckWaitList[index].thingName, AckWaitList[index].action,
00241             SHADOW_REJECTED);
00242 
00243     int16_t indexSubList;
00244 
00245     indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameAccepted);
00246     if ((indexSubList >= 0)) {
00247         if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) {
00248             ret_val = pMqttClient->unsubscribe(TemporaryTopicNameAccepted);
00249             if (ret_val == NONE_ERROR) {
00250                 SubscriptionList[indexSubList].isFree = true;
00251             }
00252         } else if (SubscriptionList[indexSubList].count > 1) {
00253             SubscriptionList[indexSubList].count--;
00254         }
00255     }
00256 
00257     indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameRejected);
00258     if ((indexSubList >= 0)) {
00259         if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) {
00260             ret_val = pMqttClient->unsubscribe(TemporaryTopicNameRejected);
00261             if (ret_val == NONE_ERROR) {
00262                 SubscriptionList[indexSubList].isFree = true;
00263             }
00264         } else if (SubscriptionList[indexSubList].count > 1) {
00265             SubscriptionList[indexSubList].count--;
00266         }
00267     }
00268 }
00269 
00270 void initializeRecords(MQTTClient_t *pClient) {
00271     uint8_t i;
00272     for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
00273         AckWaitList[i].isFree = true;
00274     }
00275     for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
00276         SubscriptionList[i].isFree = true;
00277         SubscriptionList[i].count = 0;
00278         SubscriptionList[i].isSticky = false;
00279     }
00280     pMqttClient = pClient;
00281 }
00282 
00283 bool isSubscriptionPresent(const char *pThingName, ShadowActions_t action) {
00284 
00285     uint8_t i = 0;
00286     bool isAcceptedPresent = false;
00287     bool isRejectedPresent = false;
00288     char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00289     char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00290 
00291     topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED);
00292     topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED);
00293 
00294     for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
00295         if (!SubscriptionList[i].isFree) {
00296             if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)) {
00297                 isAcceptedPresent = true;
00298             } else if ((strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) {
00299                 isRejectedPresent = true;
00300             }
00301         }
00302     }
00303 
00304     if (isRejectedPresent && isAcceptedPresent) {
00305         return true;
00306     }
00307 
00308     return false;
00309 }
00310 
00311 IoT_Error_t subscribeToShadowActionAcks(const char *pThingName, ShadowActions_t action, bool isSticky) {
00312     IoT_Error_t ret_val = NONE_ERROR;
00313     MQTTSubscribeParams subParams = MQTTSubscribeParamsDefault;
00314 
00315     bool clearBothEntriesFromList = true;
00316     int16_t indexAcceptedSubList = 0;
00317     int16_t indexRejectedSubList = 0;
00318     indexAcceptedSubList = getNextFreeIndexOfSubscriptionList();
00319     indexRejectedSubList = getNextFreeIndexOfSubscriptionList();
00320 
00321     if (indexAcceptedSubList >= 0 && indexRejectedSubList >= 0) {
00322         topicNameFromThingAndAction(SubscriptionList[indexAcceptedSubList].Topic, pThingName, action, SHADOW_ACCEPTED);
00323         subParams.mHandler = AckStatusCallback;
00324         subParams.qos = QOS_0;
00325         subParams.pTopic = SubscriptionList[indexAcceptedSubList].Topic;
00326         ret_val = pMqttClient->subscribe(&subParams);
00327         if (ret_val == NONE_ERROR) {
00328             SubscriptionList[indexAcceptedSubList].count = 1;
00329             SubscriptionList[indexAcceptedSubList].isSticky = isSticky;
00330             topicNameFromThingAndAction(SubscriptionList[indexRejectedSubList].Topic, pThingName, action,
00331                     SHADOW_REJECTED);
00332             subParams.pTopic = SubscriptionList[indexRejectedSubList].Topic;
00333             ret_val = pMqttClient->subscribe(&subParams);
00334             if (ret_val == NONE_ERROR) {
00335                 SubscriptionList[indexRejectedSubList].count = 1;
00336                 SubscriptionList[indexRejectedSubList].isSticky = isSticky;
00337                 clearBothEntriesFromList = false;
00338 
00339                 // wait for SUBSCRIBE_SETTLING_TIME seconds to let the subscription take effect
00340                 Timer subSettlingtimer;
00341                 InitTimer(&subSettlingtimer);
00342                 countdown(&subSettlingtimer, SUBSCRIBE_SETTLING_TIME);
00343                 while(!expired(&subSettlingtimer));
00344 
00345             }
00346         }
00347     }
00348 
00349     if (clearBothEntriesFromList) {
00350         if (indexAcceptedSubList >= 0) {
00351             SubscriptionList[indexAcceptedSubList].isFree = true;
00352         } else if (indexRejectedSubList >= 0) {
00353             SubscriptionList[indexRejectedSubList].isFree = true;
00354         }
00355         if (SubscriptionList[indexAcceptedSubList].count == 1) {
00356             pMqttClient->unsubscribe(SubscriptionList[indexAcceptedSubList].Topic);
00357         }
00358     }
00359 
00360     return ret_val;
00361 }
00362 
00363 void incrementSubscriptionCnt(const char *pThingName, ShadowActions_t action, bool isSticky) {
00364     char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00365     char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00366     uint8_t i;
00367     topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED);
00368     topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED);
00369 
00370     for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
00371         if (!SubscriptionList[i].isFree) {
00372             if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)
00373                     || (strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) {
00374                 SubscriptionList[i].count++;
00375                 SubscriptionList[i].isSticky = isSticky;
00376             }
00377         }
00378     }
00379 }
00380 
00381 IoT_Error_t publishToShadowAction(const char * pThingName, ShadowActions_t action, const char *pJsonDocumentToBeSent) {
00382     IoT_Error_t ret_val = NONE_ERROR;
00383     char TemporaryTopicName[MAX_SHADOW_TOPIC_LENGTH_BYTES];
00384     topicNameFromThingAndAction(TemporaryTopicName, pThingName, action, SHADOW_ACTION);
00385 
00386     MQTTPublishParams pubParams = MQTTPublishParamsDefault;
00387     pubParams.pTopic = TemporaryTopicName;
00388     MQTTMessageParams msgParams = MQTTMessageParamsDefault;
00389     msgParams.qos = QOS_0;
00390     msgParams.PayloadLen = strlen(pJsonDocumentToBeSent) + 1;
00391     msgParams.pPayload = (char *) pJsonDocumentToBeSent;
00392     pubParams.MessageParams = msgParams;
00393     ret_val = pMqttClient->publish(&pubParams);
00394 
00395     return ret_val;
00396 }
00397 
00398 bool getNextFreeIndexOfAckWaitList(uint8_t *pIndex) {
00399     uint8_t i;
00400     if (pIndex != NULL) {
00401         for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
00402             if (AckWaitList[i].isFree) {
00403                 *pIndex = i;
00404                 return true;
00405             }
00406         }
00407     }
00408     return false;
00409 }
00410 
00411 void addToAckWaitList(uint8_t indexAckWaitList, const char *pThingName, ShadowActions_t action,
00412         const char *pExtractedClientToken, fpActionCallback_t callback, void *pCallbackContext,
00413         uint32_t timeout_seconds) {
00414     AckWaitList[indexAckWaitList].callback = callback;
00415     strncpy(AckWaitList[indexAckWaitList].clientTokenID, pExtractedClientToken, MAX_SIZE_CLIENT_ID_WITH_SEQUENCE);
00416     strncpy(AckWaitList[indexAckWaitList].thingName, pThingName, MAX_SIZE_OF_THING_NAME);
00417     AckWaitList[indexAckWaitList].pCallbackContext = pCallbackContext;
00418     AckWaitList[indexAckWaitList].action = action;
00419     InitTimer(&(AckWaitList[indexAckWaitList].timer));
00420     countdown(&(AckWaitList[indexAckWaitList].timer), timeout_seconds);
00421     AckWaitList[indexAckWaitList].isFree = false;
00422 }
00423 
00424 void HandleExpiredResponseCallbacks(void) {
00425     uint8_t i;
00426     for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
00427         if (!AckWaitList[i].isFree) {
00428             if (expired(&(AckWaitList[i].timer))) {
00429                 if (AckWaitList[i].callback != NULL) {
00430                     AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, SHADOW_ACK_TIMEOUT,
00431                             shadowRxBuf, AckWaitList[i].pCallbackContext);
00432                 }
00433                 AckWaitList[i].isFree = true;
00434                 unsubscribeFromAcceptedAndRejected(i);
00435             }
00436         }
00437     }
00438 }
00439 
00440 static int shadow_delta_callback(MQTTCallbackParams params) {
00441 
00442     int32_t tokenCount;
00443     uint32_t i = 0;
00444     void *pJsonHandler;
00445     int32_t DataPosition;
00446     uint32_t dataLength;
00447 
00448     if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) {
00449         return GENERIC_ERROR;
00450     }
00451 
00452     memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen);
00453     shadowRxBuf[params.MessageParams.PayloadLen] = '\0';    // jsmn_parse relies on a string
00454 
00455     if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) {
00456         WARN("Received JSON is not valid");
00457         return GENERIC_ERROR;
00458     }
00459 
00460     if (shadowDiscardOldDeltaFlag) {
00461         uint32_t tempVersionNumber = 0;
00462         if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) {
00463             if (tempVersionNumber > shadowJsonVersionNum) {
00464                 shadowJsonVersionNum = tempVersionNumber;
00465                 DEBUG("New Version number: %d", shadowJsonVersionNum);
00466             } else {
00467                 WARN("Old Delta Message received - Ignoring rx: %d local: %d", tempVersionNumber, shadowJsonVersionNum);
00468                 return GENERIC_ERROR;
00469             }
00470         }
00471     }
00472 
00473     for (i = 0; i < tokenTableIndex; i++) {
00474         if (!tokenTable[i].isFree) {
00475             if (isJsonKeyMatchingAndUpdateValue(shadowRxBuf, pJsonHandler, tokenCount, (jsonStruct_t *)tokenTable[i].pStruct,
00476                     &dataLength, &DataPosition)) {
00477                 if (tokenTable[i].callback != NULL) {
00478                     tokenTable[i].callback(shadowRxBuf + DataPosition, dataLength, (jsonStruct_t *)tokenTable[i].pStruct);
00479                 }
00480             }
00481         }
00482     }
00483 
00484     return NONE_ERROR;
00485 }
00486