Changes to enabled on-line compiler

Committer:
JMF
Date:
Wed May 30 20:59:51 2018 +0000
Revision:
0:082731ede69f
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
JMF 0:082731ede69f 1 /*
JMF 0:082731ede69f 2 * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
JMF 0:082731ede69f 3 *
JMF 0:082731ede69f 4 * Licensed under the Apache License, Version 2.0 (the "License").
JMF 0:082731ede69f 5 * You may not use this file except in compliance with the License.
JMF 0:082731ede69f 6 * A copy of the License is located at
JMF 0:082731ede69f 7 *
JMF 0:082731ede69f 8 * http://aws.amazon.com/apache2.0
JMF 0:082731ede69f 9 *
JMF 0:082731ede69f 10 * or in the "license" file accompanying this file. This file is distributed
JMF 0:082731ede69f 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
JMF 0:082731ede69f 12 * express or implied. See the License for the specific language governing
JMF 0:082731ede69f 13 * permissions and limitations under the License.
JMF 0:082731ede69f 14 */
JMF 0:082731ede69f 15
JMF 0:082731ede69f 16 /**
JMF 0:082731ede69f 17 * @file aws_iot_mqtt_client_subscribe.c
JMF 0:082731ede69f 18 * @brief MQTT client subscribe API definitions
JMF 0:082731ede69f 19 */
JMF 0:082731ede69f 20
JMF 0:082731ede69f 21 #ifdef __cplusplus
JMF 0:082731ede69f 22 extern "C" {
JMF 0:082731ede69f 23 #endif
JMF 0:082731ede69f 24
JMF 0:082731ede69f 25 #include "aws_iot_shadow_records.h"
JMF 0:082731ede69f 26
JMF 0:082731ede69f 27 #include <string.h>
JMF 0:082731ede69f 28 #include <stdio.h>
JMF 0:082731ede69f 29
JMF 0:082731ede69f 30 #include "timer_interface.h"
JMF 0:082731ede69f 31 #include "aws_iot_json_utils.h"
JMF 0:082731ede69f 32 #include "aws_iot_log.h"
JMF 0:082731ede69f 33 #include "aws_iot_shadow_json.h"
JMF 0:082731ede69f 34 #include "aws_iot_config.h"
JMF 0:082731ede69f 35
JMF 0:082731ede69f 36 typedef struct {
JMF 0:082731ede69f 37 char clientTokenID[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE];
JMF 0:082731ede69f 38 char thingName[MAX_SIZE_OF_THING_NAME];
JMF 0:082731ede69f 39 ShadowActions_t action;
JMF 0:082731ede69f 40 fpActionCallback_t callback;
JMF 0:082731ede69f 41 void *pCallbackContext;
JMF 0:082731ede69f 42 bool isFree;
JMF 0:082731ede69f 43 awsTimer timer;
JMF 0:082731ede69f 44 } ToBeReceivedAckRecord_t;
JMF 0:082731ede69f 45
JMF 0:082731ede69f 46 typedef struct {
JMF 0:082731ede69f 47 const char *pKey;
JMF 0:082731ede69f 48 void *pStruct;
JMF 0:082731ede69f 49 jsonStructCallback_t callback;
JMF 0:082731ede69f 50 bool isFree;
JMF 0:082731ede69f 51 } JsonTokenTable_t;
JMF 0:082731ede69f 52
JMF 0:082731ede69f 53 typedef struct {
JMF 0:082731ede69f 54 char Topic[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 55 uint8_t count;
JMF 0:082731ede69f 56 bool isFree;
JMF 0:082731ede69f 57 bool isSticky;
JMF 0:082731ede69f 58 } SubscriptionRecord_t;
JMF 0:082731ede69f 59
JMF 0:082731ede69f 60 typedef enum {
JMF 0:082731ede69f 61 SHADOW_ACCEPTED, SHADOW_REJECTED, SHADOW_ACTION
JMF 0:082731ede69f 62 } ShadowAckTopicTypes_t;
JMF 0:082731ede69f 63
JMF 0:082731ede69f 64 ToBeReceivedAckRecord_t AckWaitList[MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME];
JMF 0:082731ede69f 65
JMF 0:082731ede69f 66 AWS_IoT_Client *pMqttClient;
JMF 0:082731ede69f 67
JMF 0:082731ede69f 68 char myThingName[MAX_SIZE_OF_THING_NAME];
JMF 0:082731ede69f 69 char mqttClientID[MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES];
JMF 0:082731ede69f 70
JMF 0:082731ede69f 71 char shadowDeltaTopic[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 72
JMF 0:082731ede69f 73 #define MAX_TOPICS_AT_ANY_GIVEN_TIME 2*MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME
JMF 0:082731ede69f 74 SubscriptionRecord_t SubscriptionList[MAX_TOPICS_AT_ANY_GIVEN_TIME];
JMF 0:082731ede69f 75
JMF 0:082731ede69f 76 #define SUBSCRIBE_SETTLING_TIME 2
JMF 0:082731ede69f 77 char shadowRxBuf[SHADOW_MAX_SIZE_OF_RX_BUFFER];
JMF 0:082731ede69f 78
JMF 0:082731ede69f 79 static JsonTokenTable_t tokenTable[MAX_JSON_TOKEN_EXPECTED];
JMF 0:082731ede69f 80 static uint32_t tokenTableIndex = 0;
JMF 0:082731ede69f 81 static bool deltaTopicSubscribedFlag = false;
JMF 0:082731ede69f 82 uint32_t shadowJsonVersionNum = 0;
JMF 0:082731ede69f 83 bool shadowDiscardOldDeltaFlag = true;
JMF 0:082731ede69f 84
JMF 0:082731ede69f 85 // local helper functions
JMF 0:082731ede69f 86 static void AckStatusCallback(AWS_IoT_Client *pClient, char *topicName,
JMF 0:082731ede69f 87 uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData);
JMF 0:082731ede69f 88
JMF 0:082731ede69f 89 static void shadow_delta_callback(AWS_IoT_Client *pClient, char *topicName,
JMF 0:082731ede69f 90 uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData);
JMF 0:082731ede69f 91
JMF 0:082731ede69f 92 static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action,
JMF 0:082731ede69f 93 ShadowAckTopicTypes_t ackType);
JMF 0:082731ede69f 94
JMF 0:082731ede69f 95 static int16_t getNextFreeIndexOfSubscriptionList(void);
JMF 0:082731ede69f 96
JMF 0:082731ede69f 97 static void unsubscribeFromAcceptedAndRejected(uint8_t index);
JMF 0:082731ede69f 98
JMF 0:082731ede69f 99 void initDeltaTokens(void) {
JMF 0:082731ede69f 100 uint32_t i;
JMF 0:082731ede69f 101 for(i = 0; i < MAX_JSON_TOKEN_EXPECTED; i++) {
JMF 0:082731ede69f 102 tokenTable[i].isFree = true;
JMF 0:082731ede69f 103 }
JMF 0:082731ede69f 104 tokenTableIndex = 0;
JMF 0:082731ede69f 105 deltaTopicSubscribedFlag = false;
JMF 0:082731ede69f 106 }
JMF 0:082731ede69f 107
JMF 0:082731ede69f 108 IoT_Error_t registerJsonTokenOnDelta(jsonStruct_t *pStruct) {
JMF 0:082731ede69f 109
JMF 0:082731ede69f 110 IoT_Error_t rc = AWS_SUCCESS;
JMF 0:082731ede69f 111
JMF 0:082731ede69f 112 if(!deltaTopicSubscribedFlag) {
JMF 0:082731ede69f 113 snprintf(shadowDeltaTopic, MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/update/delta", myThingName);
JMF 0:082731ede69f 114 rc = aws_iot_mqtt_subscribe(pMqttClient, shadowDeltaTopic, (uint16_t) strlen(shadowDeltaTopic), QOS0,
JMF 0:082731ede69f 115 shadow_delta_callback, NULL);
JMF 0:082731ede69f 116 deltaTopicSubscribedFlag = true;
JMF 0:082731ede69f 117 }
JMF 0:082731ede69f 118
JMF 0:082731ede69f 119 if(tokenTableIndex >= MAX_JSON_TOKEN_EXPECTED) {
JMF 0:082731ede69f 120 return FAILURE;
JMF 0:082731ede69f 121 }
JMF 0:082731ede69f 122
JMF 0:082731ede69f 123 tokenTable[tokenTableIndex].pKey = pStruct->pKey;
JMF 0:082731ede69f 124 tokenTable[tokenTableIndex].callback = pStruct->cb;
JMF 0:082731ede69f 125 tokenTable[tokenTableIndex].pStruct = pStruct;
JMF 0:082731ede69f 126 tokenTable[tokenTableIndex].isFree = false;
JMF 0:082731ede69f 127 tokenTableIndex++;
JMF 0:082731ede69f 128
JMF 0:082731ede69f 129 return rc;
JMF 0:082731ede69f 130 }
JMF 0:082731ede69f 131
JMF 0:082731ede69f 132 static int16_t getNextFreeIndexOfSubscriptionList(void) {
JMF 0:082731ede69f 133 uint8_t i;
JMF 0:082731ede69f 134 for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 135 if(SubscriptionList[i].isFree) {
JMF 0:082731ede69f 136 SubscriptionList[i].isFree = false;
JMF 0:082731ede69f 137 return i;
JMF 0:082731ede69f 138 }
JMF 0:082731ede69f 139 }
JMF 0:082731ede69f 140 return -1;
JMF 0:082731ede69f 141 }
JMF 0:082731ede69f 142
JMF 0:082731ede69f 143 static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action,
JMF 0:082731ede69f 144 ShadowAckTopicTypes_t ackType) {
JMF 0:082731ede69f 145
JMF 0:082731ede69f 146 char actionBuf[10];
JMF 0:082731ede69f 147 char ackTypeBuf[10];
JMF 0:082731ede69f 148
JMF 0:082731ede69f 149 if(SHADOW_GET == action) {
JMF 0:082731ede69f 150 strncpy(actionBuf, "get", 10);
JMF 0:082731ede69f 151 } else if(SHADOW_UPDATE == action) {
JMF 0:082731ede69f 152 strncpy(actionBuf, "update", 10);
JMF 0:082731ede69f 153 } else if(SHADOW_DELETE == action) {
JMF 0:082731ede69f 154 strncpy(actionBuf, "delete", 10);
JMF 0:082731ede69f 155 }
JMF 0:082731ede69f 156
JMF 0:082731ede69f 157 if(SHADOW_ACCEPTED == ackType) {
JMF 0:082731ede69f 158 strncpy(ackTypeBuf, "accepted", 10);
JMF 0:082731ede69f 159 } else if(SHADOW_REJECTED == ackType) {
JMF 0:082731ede69f 160 strncpy(ackTypeBuf, "rejected", 10);
JMF 0:082731ede69f 161 }
JMF 0:082731ede69f 162
JMF 0:082731ede69f 163 if(SHADOW_ACTION == ackType) {
JMF 0:082731ede69f 164 snprintf(pTopic, MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/%s", pThingName, actionBuf);
JMF 0:082731ede69f 165 } else {
JMF 0:082731ede69f 166 snprintf(pTopic, MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/%s/%s", pThingName, actionBuf,
JMF 0:082731ede69f 167 ackTypeBuf);
JMF 0:082731ede69f 168 }
JMF 0:082731ede69f 169 }
JMF 0:082731ede69f 170
JMF 0:082731ede69f 171 static bool isValidShadowVersionUpdate(const char *pTopicName) {
JMF 0:082731ede69f 172 if(strstr(pTopicName, myThingName) != NULL &&
JMF 0:082731ede69f 173 ((strstr(pTopicName, "get/accepted") != NULL) ||
JMF 0:082731ede69f 174 (strstr(pTopicName, "delta") != NULL))) {
JMF 0:082731ede69f 175 return true;
JMF 0:082731ede69f 176 }
JMF 0:082731ede69f 177 return false;
JMF 0:082731ede69f 178 }
JMF 0:082731ede69f 179
JMF 0:082731ede69f 180 static void AckStatusCallback(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen,
JMF 0:082731ede69f 181 IoT_Publish_Message_Params *params, void *pData) {
JMF 0:082731ede69f 182 int32_t tokenCount;
JMF 0:082731ede69f 183 uint8_t i;
JMF 0:082731ede69f 184 void *pJsonHandler = NULL;
JMF 0:082731ede69f 185 char temporaryClientToken[MAX_SIZE_CLIENT_TOKEN_CLIENT_SEQUENCE];
JMF 0:082731ede69f 186
JMF 0:082731ede69f 187 IOT_UNUSED(pClient);
JMF 0:082731ede69f 188 IOT_UNUSED(topicNameLen);
JMF 0:082731ede69f 189 IOT_UNUSED(pData);
JMF 0:082731ede69f 190
JMF 0:082731ede69f 191 if(params->payloadLen >= SHADOW_MAX_SIZE_OF_RX_BUFFER) {
JMF 0:082731ede69f 192 IOT_WARN("Payload larger than RX Buffer");
JMF 0:082731ede69f 193 return;
JMF 0:082731ede69f 194 }
JMF 0:082731ede69f 195
JMF 0:082731ede69f 196 memcpy(shadowRxBuf, params->payload, params->payloadLen);
JMF 0:082731ede69f 197 shadowRxBuf[params->payloadLen] = '\0'; // jsmn_parse relies on a string
JMF 0:082731ede69f 198
JMF 0:082731ede69f 199 if(!isJsonValidAndParse(shadowRxBuf, SHADOW_MAX_SIZE_OF_RX_BUFFER, pJsonHandler, &tokenCount)) {
JMF 0:082731ede69f 200 IOT_WARN("Received JSON is not valid");
JMF 0:082731ede69f 201 return;
JMF 0:082731ede69f 202 }
JMF 0:082731ede69f 203
JMF 0:082731ede69f 204 if(isValidShadowVersionUpdate(topicName)) {
JMF 0:082731ede69f 205 uint32_t tempVersionNumber = 0;
JMF 0:082731ede69f 206 if(extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) {
JMF 0:082731ede69f 207 if(tempVersionNumber > shadowJsonVersionNum) {
JMF 0:082731ede69f 208 shadowJsonVersionNum = tempVersionNumber;
JMF 0:082731ede69f 209 }
JMF 0:082731ede69f 210 }
JMF 0:082731ede69f 211 }
JMF 0:082731ede69f 212
JMF 0:082731ede69f 213 if(extractClientToken(shadowRxBuf, SHADOW_MAX_SIZE_OF_RX_BUFFER, temporaryClientToken, MAX_SIZE_CLIENT_TOKEN_CLIENT_SEQUENCE)) {
JMF 0:082731ede69f 214 for(i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 215 if(!AckWaitList[i].isFree) {
JMF 0:082731ede69f 216 if(strcmp(AckWaitList[i].clientTokenID, temporaryClientToken) == 0) {
JMF 0:082731ede69f 217 Shadow_Ack_Status_t status = SHADOW_ACK_REJECTED;
JMF 0:082731ede69f 218 if(strstr(topicName, "accepted") != NULL) {
JMF 0:082731ede69f 219 status = SHADOW_ACK_ACCEPTED;
JMF 0:082731ede69f 220 } else if(strstr(topicName, "rejected") != NULL) {
JMF 0:082731ede69f 221 status = SHADOW_ACK_REJECTED;
JMF 0:082731ede69f 222 }
JMF 0:082731ede69f 223 if(status == SHADOW_ACK_ACCEPTED || status == SHADOW_ACK_REJECTED) {
JMF 0:082731ede69f 224 if(AckWaitList[i].callback != NULL) {
JMF 0:082731ede69f 225 AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, status,
JMF 0:082731ede69f 226 shadowRxBuf, AckWaitList[i].pCallbackContext);
JMF 0:082731ede69f 227 }
JMF 0:082731ede69f 228 unsubscribeFromAcceptedAndRejected(i);
JMF 0:082731ede69f 229 AckWaitList[i].isFree = true;
JMF 0:082731ede69f 230 return;
JMF 0:082731ede69f 231 }
JMF 0:082731ede69f 232 }
JMF 0:082731ede69f 233 }
JMF 0:082731ede69f 234 }
JMF 0:082731ede69f 235 }
JMF 0:082731ede69f 236 }
JMF 0:082731ede69f 237
JMF 0:082731ede69f 238 static int16_t findIndexOfSubscriptionList(const char *pTopic) {
JMF 0:082731ede69f 239 uint8_t i;
JMF 0:082731ede69f 240 for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 241 if(!SubscriptionList[i].isFree) {
JMF 0:082731ede69f 242 if((strcmp(pTopic, SubscriptionList[i].Topic) == 0)) {
JMF 0:082731ede69f 243 return i;
JMF 0:082731ede69f 244 }
JMF 0:082731ede69f 245 }
JMF 0:082731ede69f 246 }
JMF 0:082731ede69f 247 return -1;
JMF 0:082731ede69f 248 }
JMF 0:082731ede69f 249
JMF 0:082731ede69f 250 static void unsubscribeFromAcceptedAndRejected(uint8_t index) {
JMF 0:082731ede69f 251
JMF 0:082731ede69f 252 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 253 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 254 IoT_Error_t ret_val = AWS_SUCCESS;
JMF 0:082731ede69f 255
JMF 0:082731ede69f 256 int16_t indexSubList;
JMF 0:082731ede69f 257
JMF 0:082731ede69f 258 topicNameFromThingAndAction(TemporaryTopicNameAccepted, AckWaitList[index].thingName, AckWaitList[index].action,
JMF 0:082731ede69f 259 SHADOW_ACCEPTED);
JMF 0:082731ede69f 260 topicNameFromThingAndAction(TemporaryTopicNameRejected, AckWaitList[index].thingName, AckWaitList[index].action,
JMF 0:082731ede69f 261 SHADOW_REJECTED);
JMF 0:082731ede69f 262
JMF 0:082731ede69f 263 indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameAccepted);
JMF 0:082731ede69f 264 if((indexSubList >= 0)) {
JMF 0:082731ede69f 265 if(!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) {
JMF 0:082731ede69f 266 ret_val = aws_iot_mqtt_unsubscribe(pMqttClient, TemporaryTopicNameAccepted,
JMF 0:082731ede69f 267 (uint16_t) strlen(TemporaryTopicNameAccepted));
JMF 0:082731ede69f 268 if(ret_val == AWS_SUCCESS) {
JMF 0:082731ede69f 269 SubscriptionList[indexSubList].isFree = true;
JMF 0:082731ede69f 270 }
JMF 0:082731ede69f 271 } else if(SubscriptionList[indexSubList].count > 1) {
JMF 0:082731ede69f 272 SubscriptionList[indexSubList].count--;
JMF 0:082731ede69f 273 }
JMF 0:082731ede69f 274 }
JMF 0:082731ede69f 275
JMF 0:082731ede69f 276 indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameRejected);
JMF 0:082731ede69f 277 if((indexSubList >= 0)) {
JMF 0:082731ede69f 278 if(!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) {
JMF 0:082731ede69f 279 ret_val = aws_iot_mqtt_unsubscribe(pMqttClient, TemporaryTopicNameRejected,
JMF 0:082731ede69f 280 (uint16_t) strlen(TemporaryTopicNameRejected));
JMF 0:082731ede69f 281 if(ret_val == AWS_SUCCESS) {
JMF 0:082731ede69f 282 SubscriptionList[indexSubList].isFree = true;
JMF 0:082731ede69f 283 }
JMF 0:082731ede69f 284 } else if(SubscriptionList[indexSubList].count > 1) {
JMF 0:082731ede69f 285 SubscriptionList[indexSubList].count--;
JMF 0:082731ede69f 286 }
JMF 0:082731ede69f 287 }
JMF 0:082731ede69f 288 }
JMF 0:082731ede69f 289
JMF 0:082731ede69f 290 void initializeRecords(AWS_IoT_Client *pClient) {
JMF 0:082731ede69f 291 uint8_t i;
JMF 0:082731ede69f 292 for(i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 293 AckWaitList[i].isFree = true;
JMF 0:082731ede69f 294 }
JMF 0:082731ede69f 295 for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 296 SubscriptionList[i].isFree = true;
JMF 0:082731ede69f 297 SubscriptionList[i].count = 0;
JMF 0:082731ede69f 298 SubscriptionList[i].isSticky = false;
JMF 0:082731ede69f 299 }
JMF 0:082731ede69f 300
JMF 0:082731ede69f 301 pMqttClient = pClient;
JMF 0:082731ede69f 302 }
JMF 0:082731ede69f 303
JMF 0:082731ede69f 304 bool isSubscriptionPresent(const char *pThingName, ShadowActions_t action) {
JMF 0:082731ede69f 305
JMF 0:082731ede69f 306 uint8_t i = 0;
JMF 0:082731ede69f 307 bool isAcceptedPresent = false;
JMF 0:082731ede69f 308 bool isRejectedPresent = false;
JMF 0:082731ede69f 309 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 310 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 311
JMF 0:082731ede69f 312 topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED);
JMF 0:082731ede69f 313 topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED);
JMF 0:082731ede69f 314
JMF 0:082731ede69f 315 for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 316 if(!SubscriptionList[i].isFree) {
JMF 0:082731ede69f 317 if((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)) {
JMF 0:082731ede69f 318 isAcceptedPresent = true;
JMF 0:082731ede69f 319 } else if((strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) {
JMF 0:082731ede69f 320 isRejectedPresent = true;
JMF 0:082731ede69f 321 }
JMF 0:082731ede69f 322 }
JMF 0:082731ede69f 323 }
JMF 0:082731ede69f 324
JMF 0:082731ede69f 325 if(isRejectedPresent && isAcceptedPresent) {
JMF 0:082731ede69f 326 return true;
JMF 0:082731ede69f 327 }
JMF 0:082731ede69f 328
JMF 0:082731ede69f 329 return false;
JMF 0:082731ede69f 330 }
JMF 0:082731ede69f 331
JMF 0:082731ede69f 332 IoT_Error_t subscribeToShadowActionAcks(const char *pThingName, ShadowActions_t action, bool isSticky) {
JMF 0:082731ede69f 333 IoT_Error_t ret_val = AWS_SUCCESS;
JMF 0:082731ede69f 334
JMF 0:082731ede69f 335 bool clearBothEntriesFromList = true;
JMF 0:082731ede69f 336 int16_t indexAcceptedSubList = 0;
JMF 0:082731ede69f 337 int16_t indexRejectedSubList = 0;
JMF 0:082731ede69f 338 awsTimer subSettlingtimer;
JMF 0:082731ede69f 339 indexAcceptedSubList = getNextFreeIndexOfSubscriptionList();
JMF 0:082731ede69f 340 indexRejectedSubList = getNextFreeIndexOfSubscriptionList();
JMF 0:082731ede69f 341
JMF 0:082731ede69f 342 if(indexAcceptedSubList >= 0 && indexRejectedSubList >= 0) {
JMF 0:082731ede69f 343 topicNameFromThingAndAction(SubscriptionList[indexAcceptedSubList].Topic, pThingName, action, SHADOW_ACCEPTED);
JMF 0:082731ede69f 344 ret_val = aws_iot_mqtt_subscribe(pMqttClient, SubscriptionList[indexAcceptedSubList].Topic,
JMF 0:082731ede69f 345 (uint16_t) strlen(SubscriptionList[indexAcceptedSubList].Topic), QOS0,
JMF 0:082731ede69f 346 AckStatusCallback, NULL);
JMF 0:082731ede69f 347 if(ret_val == AWS_SUCCESS) {
JMF 0:082731ede69f 348 SubscriptionList[indexAcceptedSubList].count = 1;
JMF 0:082731ede69f 349 SubscriptionList[indexAcceptedSubList].isSticky = isSticky;
JMF 0:082731ede69f 350 topicNameFromThingAndAction(SubscriptionList[indexRejectedSubList].Topic, pThingName, action,
JMF 0:082731ede69f 351 SHADOW_REJECTED);
JMF 0:082731ede69f 352 ret_val = aws_iot_mqtt_subscribe(pMqttClient, SubscriptionList[indexRejectedSubList].Topic,
JMF 0:082731ede69f 353 (uint16_t) strlen(SubscriptionList[indexRejectedSubList].Topic), QOS0,
JMF 0:082731ede69f 354 AckStatusCallback, NULL);
JMF 0:082731ede69f 355 if(ret_val == AWS_SUCCESS) {
JMF 0:082731ede69f 356 SubscriptionList[indexRejectedSubList].count = 1;
JMF 0:082731ede69f 357 SubscriptionList[indexRejectedSubList].isSticky = isSticky;
JMF 0:082731ede69f 358 clearBothEntriesFromList = false;
JMF 0:082731ede69f 359
JMF 0:082731ede69f 360 // wait for SUBSCRIBE_SETTLING_TIME seconds to let the subscription take effect
JMF 0:082731ede69f 361 init_timer(&subSettlingtimer);
JMF 0:082731ede69f 362 countdown_sec(&subSettlingtimer, SUBSCRIBE_SETTLING_TIME);
JMF 0:082731ede69f 363 while(!has_timer_expired(&subSettlingtimer));
JMF 0:082731ede69f 364
JMF 0:082731ede69f 365 }
JMF 0:082731ede69f 366 }
JMF 0:082731ede69f 367 }
JMF 0:082731ede69f 368
JMF 0:082731ede69f 369 if(clearBothEntriesFromList) {
JMF 0:082731ede69f 370 if(indexAcceptedSubList >= 0) {
JMF 0:082731ede69f 371 SubscriptionList[indexAcceptedSubList].isFree = true;
JMF 0:082731ede69f 372
JMF 0:082731ede69f 373 if(SubscriptionList[indexAcceptedSubList].count == 1) {
JMF 0:082731ede69f 374 aws_iot_mqtt_unsubscribe(pMqttClient, SubscriptionList[indexAcceptedSubList].Topic,
JMF 0:082731ede69f 375 (uint16_t) strlen(SubscriptionList[indexAcceptedSubList].Topic));
JMF 0:082731ede69f 376 }
JMF 0:082731ede69f 377 }
JMF 0:082731ede69f 378 if(indexRejectedSubList >= 0) {
JMF 0:082731ede69f 379 SubscriptionList[indexRejectedSubList].isFree = true;
JMF 0:082731ede69f 380 }
JMF 0:082731ede69f 381
JMF 0:082731ede69f 382 }
JMF 0:082731ede69f 383
JMF 0:082731ede69f 384 return ret_val;
JMF 0:082731ede69f 385 }
JMF 0:082731ede69f 386
JMF 0:082731ede69f 387 void incrementSubscriptionCnt(const char *pThingName, ShadowActions_t action, bool isSticky) {
JMF 0:082731ede69f 388 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 389 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 390 uint8_t i;
JMF 0:082731ede69f 391 topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED);
JMF 0:082731ede69f 392 topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED);
JMF 0:082731ede69f 393
JMF 0:082731ede69f 394 for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 395 if(!SubscriptionList[i].isFree) {
JMF 0:082731ede69f 396 if((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)
JMF 0:082731ede69f 397 || (strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) {
JMF 0:082731ede69f 398 SubscriptionList[i].count++;
JMF 0:082731ede69f 399 SubscriptionList[i].isSticky = isSticky;
JMF 0:082731ede69f 400 }
JMF 0:082731ede69f 401 }
JMF 0:082731ede69f 402 }
JMF 0:082731ede69f 403 }
JMF 0:082731ede69f 404
JMF 0:082731ede69f 405 IoT_Error_t publishToShadowAction(const char *pThingName, ShadowActions_t action, const char *pJsonDocumentToBeSent) {
JMF 0:082731ede69f 406 IoT_Error_t ret_val = AWS_SUCCESS;
JMF 0:082731ede69f 407 char TemporaryTopicName[MAX_SHADOW_TOPIC_LENGTH_BYTES];
JMF 0:082731ede69f 408 IoT_Publish_Message_Params msgParams;
JMF 0:082731ede69f 409
JMF 0:082731ede69f 410 if(NULL == pThingName || NULL == pJsonDocumentToBeSent) {
JMF 0:082731ede69f 411 return NULL_VALUE_ERROR;
JMF 0:082731ede69f 412 }
JMF 0:082731ede69f 413
JMF 0:082731ede69f 414 topicNameFromThingAndAction(TemporaryTopicName, pThingName, action, SHADOW_ACTION);
JMF 0:082731ede69f 415
JMF 0:082731ede69f 416 msgParams.qos = QOS0;
JMF 0:082731ede69f 417 msgParams.isRetained = 0;
JMF 0:082731ede69f 418 msgParams.payloadLen = strlen(pJsonDocumentToBeSent);
JMF 0:082731ede69f 419 msgParams.payload = (char *) pJsonDocumentToBeSent;
JMF 0:082731ede69f 420 ret_val = aws_iot_mqtt_publish(pMqttClient, TemporaryTopicName, (uint16_t) strlen(TemporaryTopicName), &msgParams);
JMF 0:082731ede69f 421
JMF 0:082731ede69f 422 return ret_val;
JMF 0:082731ede69f 423 }
JMF 0:082731ede69f 424
JMF 0:082731ede69f 425 bool getNextFreeIndexOfAckWaitList(uint8_t *pIndex) {
JMF 0:082731ede69f 426 uint8_t i;
JMF 0:082731ede69f 427 bool rc = false;
JMF 0:082731ede69f 428
JMF 0:082731ede69f 429 if(NULL == pIndex) {
JMF 0:082731ede69f 430 return false;
JMF 0:082731ede69f 431 }
JMF 0:082731ede69f 432
JMF 0:082731ede69f 433 for(i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 434 if(AckWaitList[i].isFree) {
JMF 0:082731ede69f 435 *pIndex = i;
JMF 0:082731ede69f 436 rc = true;
JMF 0:082731ede69f 437 break;
JMF 0:082731ede69f 438 }
JMF 0:082731ede69f 439 }
JMF 0:082731ede69f 440
JMF 0:082731ede69f 441 return rc;
JMF 0:082731ede69f 442 }
JMF 0:082731ede69f 443
JMF 0:082731ede69f 444 void addToAckWaitList(uint8_t indexAckWaitList, const char *pThingName, ShadowActions_t action,
JMF 0:082731ede69f 445 const char *pExtractedClientToken, fpActionCallback_t callback, void *pCallbackContext,
JMF 0:082731ede69f 446 uint32_t timeout_seconds) {
JMF 0:082731ede69f 447 AckWaitList[indexAckWaitList].callback = callback;
JMF 0:082731ede69f 448 memcpy(AckWaitList[indexAckWaitList].clientTokenID, pExtractedClientToken, MAX_SIZE_CLIENT_ID_WITH_SEQUENCE);
JMF 0:082731ede69f 449 memcpy(AckWaitList[indexAckWaitList].thingName, pThingName, MAX_SIZE_OF_THING_NAME);
JMF 0:082731ede69f 450 AckWaitList[indexAckWaitList].pCallbackContext = pCallbackContext;
JMF 0:082731ede69f 451 AckWaitList[indexAckWaitList].action = action;
JMF 0:082731ede69f 452 init_timer(&(AckWaitList[indexAckWaitList].timer));
JMF 0:082731ede69f 453 countdown_sec(&(AckWaitList[indexAckWaitList].timer), timeout_seconds);
JMF 0:082731ede69f 454 AckWaitList[indexAckWaitList].isFree = false;
JMF 0:082731ede69f 455 }
JMF 0:082731ede69f 456
JMF 0:082731ede69f 457 void HandleExpiredResponseCallbacks(void) {
JMF 0:082731ede69f 458 uint8_t i;
JMF 0:082731ede69f 459 for(i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
JMF 0:082731ede69f 460 if(!AckWaitList[i].isFree) {
JMF 0:082731ede69f 461 if(has_timer_expired(&(AckWaitList[i].timer))) {
JMF 0:082731ede69f 462 if(AckWaitList[i].callback != NULL) {
JMF 0:082731ede69f 463 AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, SHADOW_ACK_TIMEOUT,
JMF 0:082731ede69f 464 shadowRxBuf, AckWaitList[i].pCallbackContext);
JMF 0:082731ede69f 465 }
JMF 0:082731ede69f 466 AckWaitList[i].isFree = true;
JMF 0:082731ede69f 467 unsubscribeFromAcceptedAndRejected(i);
JMF 0:082731ede69f 468 }
JMF 0:082731ede69f 469 }
JMF 0:082731ede69f 470 }
JMF 0:082731ede69f 471 }
JMF 0:082731ede69f 472
JMF 0:082731ede69f 473 static void shadow_delta_callback(AWS_IoT_Client *pClient, char *topicName,
JMF 0:082731ede69f 474 uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData) {
JMF 0:082731ede69f 475 int32_t tokenCount;
JMF 0:082731ede69f 476 uint32_t i = 0;
JMF 0:082731ede69f 477 void *pJsonHandler = NULL;
JMF 0:082731ede69f 478 int32_t DataPosition;
JMF 0:082731ede69f 479 uint32_t dataLength;
JMF 0:082731ede69f 480 uint32_t tempVersionNumber = 0;
JMF 0:082731ede69f 481
JMF 0:082731ede69f 482 FUNC_ENTRY;
JMF 0:082731ede69f 483
JMF 0:082731ede69f 484 IOT_UNUSED(pClient);
JMF 0:082731ede69f 485 IOT_UNUSED(topicName);
JMF 0:082731ede69f 486 IOT_UNUSED(topicNameLen);
JMF 0:082731ede69f 487 IOT_UNUSED(pData);
JMF 0:082731ede69f 488
JMF 0:082731ede69f 489 if(params->payloadLen >= SHADOW_MAX_SIZE_OF_RX_BUFFER) {
JMF 0:082731ede69f 490 IOT_WARN("Payload larger than RX Buffer");
JMF 0:082731ede69f 491 return;
JMF 0:082731ede69f 492 }
JMF 0:082731ede69f 493
JMF 0:082731ede69f 494 memcpy(shadowRxBuf, params->payload, params->payloadLen);
JMF 0:082731ede69f 495 shadowRxBuf[params->payloadLen] = '\0'; // jsmn_parse relies on a string
JMF 0:082731ede69f 496
JMF 0:082731ede69f 497 if(!isJsonValidAndParse(shadowRxBuf, SHADOW_MAX_SIZE_OF_RX_BUFFER, pJsonHandler, &tokenCount)) {
JMF 0:082731ede69f 498 IOT_WARN("Received JSON is not valid");
JMF 0:082731ede69f 499 return;
JMF 0:082731ede69f 500 }
JMF 0:082731ede69f 501
JMF 0:082731ede69f 502 if(shadowDiscardOldDeltaFlag) {
JMF 0:082731ede69f 503 if(extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) {
JMF 0:082731ede69f 504 if(tempVersionNumber > shadowJsonVersionNum) {
JMF 0:082731ede69f 505 shadowJsonVersionNum = tempVersionNumber;
JMF 0:082731ede69f 506 } else {
JMF 0:082731ede69f 507 IOT_WARN("Old Delta Message received - Ignoring rx: %ld local: %ld", tempVersionNumber,
JMF 0:082731ede69f 508 shadowJsonVersionNum);
JMF 0:082731ede69f 509 return;
JMF 0:082731ede69f 510 }
JMF 0:082731ede69f 511 }
JMF 0:082731ede69f 512 }
JMF 0:082731ede69f 513
JMF 0:082731ede69f 514 for(i = 0; i < tokenTableIndex; i++) {
JMF 0:082731ede69f 515 if(!tokenTable[i].isFree) {
JMF 0:082731ede69f 516 if(isJsonKeyMatchingAndUpdateValue(shadowRxBuf, pJsonHandler, tokenCount,
JMF 0:082731ede69f 517 (jsonStruct_t *) tokenTable[i].pStruct, &dataLength, &DataPosition)) {
JMF 0:082731ede69f 518 if(tokenTable[i].callback != NULL) {
JMF 0:082731ede69f 519 tokenTable[i].callback(shadowRxBuf + DataPosition, dataLength,
JMF 0:082731ede69f 520 (jsonStruct_t *) tokenTable[i].pStruct);
JMF 0:082731ede69f 521 }
JMF 0:082731ede69f 522 }
JMF 0:082731ede69f 523 }
JMF 0:082731ede69f 524 }
JMF 0:082731ede69f 525 }
JMF 0:082731ede69f 526
JMF 0:082731ede69f 527 #ifdef __cplusplus
JMF 0:082731ede69f 528 }
JMF 0:082731ede69f 529 #endif