Changes to enabled on-line compiler

src/aws_iot_shadow_records.c

Committer:
JMF
Date:
2018-05-30
Revision:
0:082731ede69f

File content as of revision 0:082731ede69f:

/*
 * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

/**
 * @file aws_iot_mqtt_client_subscribe.c
 * @brief MQTT client subscribe API definitions
 */

#ifdef __cplusplus
extern "C" {
#endif

#include "aws_iot_shadow_records.h"

#include <string.h>
#include <stdio.h>

#include "timer_interface.h"
#include "aws_iot_json_utils.h"
#include "aws_iot_log.h"
#include "aws_iot_shadow_json.h"
#include "aws_iot_config.h"

typedef struct {
	char clientTokenID[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE];
	char thingName[MAX_SIZE_OF_THING_NAME];
	ShadowActions_t action;
	fpActionCallback_t callback;
	void *pCallbackContext;
	bool isFree;
	awsTimer timer;
} ToBeReceivedAckRecord_t;

typedef struct {
	const char *pKey;
	void *pStruct;
	jsonStructCallback_t callback;
	bool isFree;
} JsonTokenTable_t;

typedef struct {
	char Topic[MAX_SHADOW_TOPIC_LENGTH_BYTES];
	uint8_t count;
	bool isFree;
	bool isSticky;
} SubscriptionRecord_t;

typedef enum {
	SHADOW_ACCEPTED, SHADOW_REJECTED, SHADOW_ACTION
} ShadowAckTopicTypes_t;

ToBeReceivedAckRecord_t AckWaitList[MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME];

AWS_IoT_Client *pMqttClient;

char myThingName[MAX_SIZE_OF_THING_NAME];
char mqttClientID[MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES];

char shadowDeltaTopic[MAX_SHADOW_TOPIC_LENGTH_BYTES];

#define MAX_TOPICS_AT_ANY_GIVEN_TIME 2*MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME
SubscriptionRecord_t SubscriptionList[MAX_TOPICS_AT_ANY_GIVEN_TIME];

#define SUBSCRIBE_SETTLING_TIME 2
char shadowRxBuf[SHADOW_MAX_SIZE_OF_RX_BUFFER];

static JsonTokenTable_t tokenTable[MAX_JSON_TOKEN_EXPECTED];
static uint32_t tokenTableIndex = 0;
static bool deltaTopicSubscribedFlag = false;
uint32_t shadowJsonVersionNum = 0;
bool shadowDiscardOldDeltaFlag = true;

// local helper functions
static void AckStatusCallback(AWS_IoT_Client *pClient, char *topicName,
							  uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData);

static void shadow_delta_callback(AWS_IoT_Client *pClient, char *topicName,
								  uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData);

static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action,
										ShadowAckTopicTypes_t ackType);

static int16_t getNextFreeIndexOfSubscriptionList(void);

static void unsubscribeFromAcceptedAndRejected(uint8_t index);

void initDeltaTokens(void) {
	uint32_t i;
	for(i = 0; i < MAX_JSON_TOKEN_EXPECTED; i++) {
		tokenTable[i].isFree = true;
	}
	tokenTableIndex = 0;
	deltaTopicSubscribedFlag = false;
}

IoT_Error_t registerJsonTokenOnDelta(jsonStruct_t *pStruct) {

	IoT_Error_t rc = AWS_SUCCESS;

	if(!deltaTopicSubscribedFlag) {
		snprintf(shadowDeltaTopic, MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/update/delta", myThingName);
		rc = aws_iot_mqtt_subscribe(pMqttClient, shadowDeltaTopic, (uint16_t) strlen(shadowDeltaTopic), QOS0,
									shadow_delta_callback, NULL);
		deltaTopicSubscribedFlag = true;
	}

	if(tokenTableIndex >= MAX_JSON_TOKEN_EXPECTED) {
		return FAILURE;
	}

	tokenTable[tokenTableIndex].pKey = pStruct->pKey;
	tokenTable[tokenTableIndex].callback = pStruct->cb;
	tokenTable[tokenTableIndex].pStruct = pStruct;
	tokenTable[tokenTableIndex].isFree = false;
	tokenTableIndex++;

	return rc;
}

static int16_t getNextFreeIndexOfSubscriptionList(void) {
	uint8_t i;
	for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
		if(SubscriptionList[i].isFree) {
			SubscriptionList[i].isFree = false;
			return i;
		}
	}
	return -1;
}

static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action,
										ShadowAckTopicTypes_t ackType) {

	char actionBuf[10];
	char ackTypeBuf[10];

	if(SHADOW_GET == action) {
		strncpy(actionBuf, "get", 10);
	} else if(SHADOW_UPDATE == action) {
		strncpy(actionBuf, "update", 10);
	} else if(SHADOW_DELETE == action) {
		strncpy(actionBuf, "delete", 10);
	}

	if(SHADOW_ACCEPTED == ackType) {
		strncpy(ackTypeBuf, "accepted", 10);
	} else if(SHADOW_REJECTED == ackType) {
		strncpy(ackTypeBuf, "rejected", 10);
	}

	if(SHADOW_ACTION == ackType) {
		snprintf(pTopic, MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/%s", pThingName, actionBuf);
	} else {
		snprintf(pTopic, MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/%s/%s", pThingName, actionBuf,
				 ackTypeBuf);
	}
}

static bool isValidShadowVersionUpdate(const char *pTopicName) {
	if(strstr(pTopicName, myThingName) != NULL &&
	   ((strstr(pTopicName, "get/accepted") != NULL) ||
		(strstr(pTopicName, "delta") != NULL))) {
		return true;
	}
	return false;
}

static void AckStatusCallback(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen,
							  IoT_Publish_Message_Params *params, void *pData) {
	int32_t tokenCount;
	uint8_t i;
	void *pJsonHandler = NULL;
	char temporaryClientToken[MAX_SIZE_CLIENT_TOKEN_CLIENT_SEQUENCE];

	IOT_UNUSED(pClient);
	IOT_UNUSED(topicNameLen);
	IOT_UNUSED(pData);

	if(params->payloadLen >= SHADOW_MAX_SIZE_OF_RX_BUFFER) {
		IOT_WARN("Payload larger than RX Buffer");
		return;
	}

	memcpy(shadowRxBuf, params->payload, params->payloadLen);
	shadowRxBuf[params->payloadLen] = '\0';    // jsmn_parse relies on a string

	if(!isJsonValidAndParse(shadowRxBuf, SHADOW_MAX_SIZE_OF_RX_BUFFER, pJsonHandler, &tokenCount)) {
		IOT_WARN("Received JSON is not valid");
		return;
	}

	if(isValidShadowVersionUpdate(topicName)) {
		uint32_t tempVersionNumber = 0;
		if(extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) {
			if(tempVersionNumber > shadowJsonVersionNum) {
				shadowJsonVersionNum = tempVersionNumber;
			}
		}
	}

	if(extractClientToken(shadowRxBuf, SHADOW_MAX_SIZE_OF_RX_BUFFER, temporaryClientToken, MAX_SIZE_CLIENT_TOKEN_CLIENT_SEQUENCE)) {
		for(i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
			if(!AckWaitList[i].isFree) {
				if(strcmp(AckWaitList[i].clientTokenID, temporaryClientToken) == 0) {
					Shadow_Ack_Status_t status = SHADOW_ACK_REJECTED;
					if(strstr(topicName, "accepted") != NULL) {
						status = SHADOW_ACK_ACCEPTED;
					} else if(strstr(topicName, "rejected") != NULL) {
						status = SHADOW_ACK_REJECTED;
					}
					if(status == SHADOW_ACK_ACCEPTED || status == SHADOW_ACK_REJECTED) {
						if(AckWaitList[i].callback != NULL) {
							AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, status,
													shadowRxBuf, AckWaitList[i].pCallbackContext);
						}
						unsubscribeFromAcceptedAndRejected(i);
						AckWaitList[i].isFree = true;
						return;
					}
				}
			}
		}
	}
}

static int16_t findIndexOfSubscriptionList(const char *pTopic) {
	uint8_t i;
	for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
		if(!SubscriptionList[i].isFree) {
			if((strcmp(pTopic, SubscriptionList[i].Topic) == 0)) {
				return i;
			}
		}
	}
	return -1;
}

static void unsubscribeFromAcceptedAndRejected(uint8_t index) {

	char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
	char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
	IoT_Error_t ret_val = AWS_SUCCESS;

	int16_t indexSubList;

	topicNameFromThingAndAction(TemporaryTopicNameAccepted, AckWaitList[index].thingName, AckWaitList[index].action,
								SHADOW_ACCEPTED);
	topicNameFromThingAndAction(TemporaryTopicNameRejected, AckWaitList[index].thingName, AckWaitList[index].action,
								SHADOW_REJECTED);

	indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameAccepted);
	if((indexSubList >= 0)) {
		if(!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) {
			ret_val = aws_iot_mqtt_unsubscribe(pMqttClient, TemporaryTopicNameAccepted,
											   (uint16_t) strlen(TemporaryTopicNameAccepted));
			if(ret_val == AWS_SUCCESS) {
				SubscriptionList[indexSubList].isFree = true;
			}
		} else if(SubscriptionList[indexSubList].count > 1) {
			SubscriptionList[indexSubList].count--;
		}
	}

	indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameRejected);
	if((indexSubList >= 0)) {
		if(!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) {
			ret_val = aws_iot_mqtt_unsubscribe(pMqttClient, TemporaryTopicNameRejected,
											   (uint16_t) strlen(TemporaryTopicNameRejected));
			if(ret_val == AWS_SUCCESS) {
				SubscriptionList[indexSubList].isFree = true;
			}
		} else if(SubscriptionList[indexSubList].count > 1) {
			SubscriptionList[indexSubList].count--;
		}
	}
}

void initializeRecords(AWS_IoT_Client *pClient) {
	uint8_t i;
	for(i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
		AckWaitList[i].isFree = true;
	}
	for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
		SubscriptionList[i].isFree = true;
		SubscriptionList[i].count = 0;
		SubscriptionList[i].isSticky = false;
	}

	pMqttClient = pClient;
}

bool isSubscriptionPresent(const char *pThingName, ShadowActions_t action) {

	uint8_t i = 0;
	bool isAcceptedPresent = false;
	bool isRejectedPresent = false;
	char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
	char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];

	topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED);
	topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED);

	for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
		if(!SubscriptionList[i].isFree) {
			if((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)) {
				isAcceptedPresent = true;
			} else if((strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) {
				isRejectedPresent = true;
			}
		}
	}

	if(isRejectedPresent && isAcceptedPresent) {
		return true;
	}

	return false;
}

IoT_Error_t subscribeToShadowActionAcks(const char *pThingName, ShadowActions_t action, bool isSticky) {
	IoT_Error_t ret_val = AWS_SUCCESS;

	bool clearBothEntriesFromList = true;
	int16_t indexAcceptedSubList = 0;
	int16_t indexRejectedSubList = 0;
	awsTimer subSettlingtimer;
	indexAcceptedSubList = getNextFreeIndexOfSubscriptionList();
	indexRejectedSubList = getNextFreeIndexOfSubscriptionList();

	if(indexAcceptedSubList >= 0 && indexRejectedSubList >= 0) {
		topicNameFromThingAndAction(SubscriptionList[indexAcceptedSubList].Topic, pThingName, action, SHADOW_ACCEPTED);
		ret_val = aws_iot_mqtt_subscribe(pMqttClient, SubscriptionList[indexAcceptedSubList].Topic,
										 (uint16_t) strlen(SubscriptionList[indexAcceptedSubList].Topic), QOS0,
										 AckStatusCallback, NULL);
		if(ret_val == AWS_SUCCESS) {
			SubscriptionList[indexAcceptedSubList].count = 1;
			SubscriptionList[indexAcceptedSubList].isSticky = isSticky;
			topicNameFromThingAndAction(SubscriptionList[indexRejectedSubList].Topic, pThingName, action,
										SHADOW_REJECTED);
			ret_val = aws_iot_mqtt_subscribe(pMqttClient, SubscriptionList[indexRejectedSubList].Topic,
											 (uint16_t) strlen(SubscriptionList[indexRejectedSubList].Topic), QOS0,
											 AckStatusCallback, NULL);
			if(ret_val == AWS_SUCCESS) {
				SubscriptionList[indexRejectedSubList].count = 1;
				SubscriptionList[indexRejectedSubList].isSticky = isSticky;
				clearBothEntriesFromList = false;

				// wait for SUBSCRIBE_SETTLING_TIME seconds to let the subscription take effect
				init_timer(&subSettlingtimer);
				countdown_sec(&subSettlingtimer, SUBSCRIBE_SETTLING_TIME);
				while(!has_timer_expired(&subSettlingtimer));

			}
		}
	}

	if(clearBothEntriesFromList) {
		if(indexAcceptedSubList >= 0) {
			SubscriptionList[indexAcceptedSubList].isFree = true;
			
			if(SubscriptionList[indexAcceptedSubList].count == 1) {
			    aws_iot_mqtt_unsubscribe(pMqttClient, SubscriptionList[indexAcceptedSubList].Topic,
				(uint16_t) strlen(SubscriptionList[indexAcceptedSubList].Topic));
		    }
		}
		if(indexRejectedSubList >= 0) {
			SubscriptionList[indexRejectedSubList].isFree = true;
		}

	}

	return ret_val;
}

void incrementSubscriptionCnt(const char *pThingName, ShadowActions_t action, bool isSticky) {
	char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES];
	char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES];
	uint8_t i;
	topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED);
	topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED);

	for(i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) {
		if(!SubscriptionList[i].isFree) {
			if((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)
			   || (strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) {
				SubscriptionList[i].count++;
				SubscriptionList[i].isSticky = isSticky;
			}
		}
	}
}

IoT_Error_t publishToShadowAction(const char *pThingName, ShadowActions_t action, const char *pJsonDocumentToBeSent) {
	IoT_Error_t ret_val = AWS_SUCCESS;
	char TemporaryTopicName[MAX_SHADOW_TOPIC_LENGTH_BYTES];
	IoT_Publish_Message_Params msgParams;

	if(NULL == pThingName || NULL == pJsonDocumentToBeSent) {
		return NULL_VALUE_ERROR;
	}

	topicNameFromThingAndAction(TemporaryTopicName, pThingName, action, SHADOW_ACTION);

	msgParams.qos = QOS0;
	msgParams.isRetained = 0;
	msgParams.payloadLen = strlen(pJsonDocumentToBeSent);
	msgParams.payload = (char *) pJsonDocumentToBeSent;
	ret_val = aws_iot_mqtt_publish(pMqttClient, TemporaryTopicName, (uint16_t) strlen(TemporaryTopicName), &msgParams);

	return ret_val;
}

bool getNextFreeIndexOfAckWaitList(uint8_t *pIndex) {
	uint8_t i;
	bool rc = false;

	if(NULL == pIndex) {
		return false;
	}

	for(i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
		if(AckWaitList[i].isFree) {
			*pIndex = i;
			rc = true;
			break;
		}
	}

	return rc;
}

void addToAckWaitList(uint8_t indexAckWaitList, const char *pThingName, ShadowActions_t action,
					  const char *pExtractedClientToken, fpActionCallback_t callback, void *pCallbackContext,
					  uint32_t timeout_seconds) {
	AckWaitList[indexAckWaitList].callback = callback;
	memcpy(AckWaitList[indexAckWaitList].clientTokenID, pExtractedClientToken, MAX_SIZE_CLIENT_ID_WITH_SEQUENCE);
	memcpy(AckWaitList[indexAckWaitList].thingName, pThingName, MAX_SIZE_OF_THING_NAME);
	AckWaitList[indexAckWaitList].pCallbackContext = pCallbackContext;
	AckWaitList[indexAckWaitList].action = action;
	init_timer(&(AckWaitList[indexAckWaitList].timer));
	countdown_sec(&(AckWaitList[indexAckWaitList].timer), timeout_seconds);
	AckWaitList[indexAckWaitList].isFree = false;
}

void HandleExpiredResponseCallbacks(void) {
	uint8_t i;
	for(i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) {
		if(!AckWaitList[i].isFree) {
			if(has_timer_expired(&(AckWaitList[i].timer))) {
				if(AckWaitList[i].callback != NULL) {
					AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, SHADOW_ACK_TIMEOUT,
											shadowRxBuf, AckWaitList[i].pCallbackContext);
				}
				AckWaitList[i].isFree = true;
				unsubscribeFromAcceptedAndRejected(i);
			}
		}
	}
}

static void shadow_delta_callback(AWS_IoT_Client *pClient, char *topicName,
								  uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData) {
	int32_t tokenCount;
	uint32_t i = 0;
	void *pJsonHandler = NULL;
	int32_t DataPosition;
	uint32_t dataLength;
	uint32_t tempVersionNumber = 0;

	FUNC_ENTRY;

	IOT_UNUSED(pClient);
	IOT_UNUSED(topicName);
	IOT_UNUSED(topicNameLen);
	IOT_UNUSED(pData);

	if(params->payloadLen >= SHADOW_MAX_SIZE_OF_RX_BUFFER) {
		IOT_WARN("Payload larger than RX Buffer");
		return;
	}

	memcpy(shadowRxBuf, params->payload, params->payloadLen);
	shadowRxBuf[params->payloadLen] = '\0';    // jsmn_parse relies on a string

	if(!isJsonValidAndParse(shadowRxBuf, SHADOW_MAX_SIZE_OF_RX_BUFFER, pJsonHandler, &tokenCount)) {
		IOT_WARN("Received JSON is not valid");
		return;
	}

	if(shadowDiscardOldDeltaFlag) {
		if(extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) {
			if(tempVersionNumber > shadowJsonVersionNum) {
				shadowJsonVersionNum = tempVersionNumber;
			} else {
				IOT_WARN("Old Delta Message received - Ignoring rx: %ld local: %ld", tempVersionNumber,
						 shadowJsonVersionNum);
				return;
			}
		}
	}

	for(i = 0; i < tokenTableIndex; i++) {
		if(!tokenTable[i].isFree) {
			if(isJsonKeyMatchingAndUpdateValue(shadowRxBuf, pJsonHandler, tokenCount,
											   (jsonStruct_t *) tokenTable[i].pStruct, &dataLength, &DataPosition)) {
				if(tokenTable[i].callback != NULL) {
					tokenTable[i].callback(shadowRxBuf + DataPosition, dataLength,
										   (jsonStruct_t *) tokenTable[i].pStruct);
				}
			}
		}
	}
}

#ifdef __cplusplus
}
#endif