Jim Flynn / Mbed OS aws-iot-device-sdk-mbed-c
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers jobs_sample.c Source File

jobs_sample.c

00001 /*
00002  * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License").
00005  * You may not use this file except in compliance with the License.
00006  * A copy of the License is located at
00007  *
00008  *  http://aws.amazon.com/apache2.0
00009  *
00010  * or in the "license" file accompanying this file. This file is distributed
00011  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
00012  * express or implied. See the License for the specific language governing
00013  * permissions and limitations under the License.
00014  */
00015 
00016 /**
00017  *
00018  * This example takes the parameters from the aws_iot_config.h file and establishes 
00019  * a connection to the AWS IoT MQTT Platform. It performs several operations to 
00020  * demonstrate the basic capabilities of the AWS IoT Jobs platform.
00021  *
00022  * If all the certs are correct, you should see the list of pending Job Executions 
00023  * printed out by the iot_get_pending_callback_handler. If there are any existing pending 
00024  * job executions each will be processed one at a time in the iot_next_job_callback_handler.
00025  * After all of the pending jobs have been processed the program will wait for
00026  * notifications for new pending jobs and process them one at a time as they come in.
00027  * 
00028  * In the main body you can see how each callback is registered for each corresponding
00029  * Jobs topic.
00030  *
00031  */
00032 #include <stdio.h>
00033 #include <stdlib.h>
00034 #include <ctype.h>
00035 #include <unistd.h>
00036 #include <limits.h>
00037 #include <string.h>
00038 
00039 #include "aws_iot_config.h"
00040 #include "aws_iot_json_utils.h"
00041 #include "aws_iot_log.h"
00042 #include "aws_iot_version.h"
00043 #include "aws_iot_mqtt_client_interface.h"
00044 #include "aws_iot_jobs_interface.h"
00045 
00046 /**
00047  * @brief Default cert location
00048  */
00049 char certDirectory[PATH_MAX + 1] = "../../../certs";
00050 
00051 /**
00052  * @brief Default MQTT HOST URL is pulled from the aws_iot_config.h
00053  */
00054 char HostAddress[255] = AWS_IOT_MQTT_HOST;
00055 
00056 /**
00057  * @brief Default MQTT port is pulled from the aws_iot_config.h
00058  */
00059 uint32_t port = AWS_IOT_MQTT_PORT;
00060 
00061 static jsmn_parser jsonParser;
00062 static jsmntok_t jsonTokenStruct[MAX_JSON_TOKEN_EXPECTED];
00063 static int32_t tokenCount;
00064 
00065 void iot_get_pending_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen,
00066                                     IoT_Publish_Message_Params *params, void *pData) {
00067     IOT_UNUSED(pData);
00068     IOT_UNUSED(pClient);
00069     IOT_INFO("\nJOB_GET_PENDING_TOPIC callback");
00070     IOT_INFO("topic: %.*s", topicNameLen, topicName);
00071     IOT_INFO("payload: %.*s", (int) params->payloadLen, (char *)params->payload);
00072 
00073     jsmn_init(&jsonParser);
00074 
00075     tokenCount = jsmn_parse(&jsonParser, params->payload, (int) params->payloadLen, jsonTokenStruct, MAX_JSON_TOKEN_EXPECTED);
00076 
00077     if(tokenCount < 0) {
00078         IOT_WARN("Failed to parse JSON: %d", tokenCount);
00079         return;
00080     }
00081 
00082     /* Assume the top-level element is an object */
00083     if(tokenCount < 1 || jsonTokenStruct[0].type != JSMN_OBJECT) {
00084         IOT_WARN("Top Level is not an object");
00085         return;
00086     }
00087 
00088     jsmntok_t *jobs;
00089 
00090     jobs = findToken("inProgressJobs", params->payload, jsonTokenStruct);
00091 
00092     if (jobs) {
00093         IOT_INFO("inProgressJobs: %.*s", jobs->end - jobs->start, (char *)params->payload + jobs->start);
00094     }   
00095 
00096     jobs = findToken("queuedJobs", params->payload, jsonTokenStruct);
00097 
00098     if (jobs) {
00099         IOT_INFO("queuedJobs: %.*s", jobs->end - jobs->start, (char *)params->payload + jobs->start);
00100     }
00101 }
00102 
00103 void iot_next_job_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen,
00104                                     IoT_Publish_Message_Params *params, void *pData) {
00105     char topicToPublishUpdate[MAX_JOB_TOPIC_LENGTH_BYTES];
00106     char messageBuffer[200];
00107 
00108     IOT_UNUSED(pData);
00109     IOT_UNUSED(pClient);
00110     IOT_INFO("\nJOB_NOTIFY_NEXT_TOPIC / JOB_DESCRIBE_TOPIC($next) callback");
00111     IOT_INFO("topic: %.*s", topicNameLen, topicName);
00112     IOT_INFO("payload: %.*s", (int) params->payloadLen, (char *)params->payload);
00113 
00114     jsmn_init(&jsonParser);
00115 
00116     tokenCount = jsmn_parse(&jsonParser, params->payload, (int) params->payloadLen, jsonTokenStruct, MAX_JSON_TOKEN_EXPECTED);
00117 
00118     if(tokenCount < 0) {
00119         IOT_WARN("Failed to parse JSON: %d", tokenCount);
00120         return;
00121     }
00122 
00123     /* Assume the top-level element is an object */
00124     if(tokenCount < 1 || jsonTokenStruct[0].type != JSMN_OBJECT) {
00125         IOT_WARN("Top Level is not an object");
00126         return;
00127     }
00128 
00129     jsmntok_t *tokExecution;
00130 
00131     tokExecution = findToken("execution", params->payload, jsonTokenStruct);
00132 
00133     if (tokExecution) {
00134         IOT_INFO("execution: %.*s", tokExecution->end - tokExecution->start, (char *)params->payload + tokExecution->start);
00135 
00136         jsmntok_t *tok;
00137 
00138         tok = findToken("jobId", params->payload, tokExecution);
00139 
00140         if (tok) {
00141             IoT_Error_t rc;
00142             char jobId[MAX_SIZE_OF_JOB_ID + 1];
00143             AwsIotJobExecutionUpdateRequest updateRequest;
00144 
00145             rc = parseStringValue(jobId, MAX_SIZE_OF_JOB_ID + 1, params->payload, tok);         
00146             if(SUCCESS != rc) {
00147                 IOT_ERROR("parseStringValue returned error : %d ", rc);
00148                 return;
00149             }
00150 
00151             IOT_INFO("jobId: %s", jobId);
00152 
00153             tok = findToken("jobDocument", params->payload, tokExecution);
00154 
00155             /*
00156              * Do your job processing here.
00157              */
00158 
00159             if (tok) {
00160                 IOT_INFO("jobDocument: %.*s", tok->end - tok->start, (char *)params->payload + tok->start);
00161                 /* Alternatively if the job still has more steps the status can be set to JOB_EXECUTION_IN_PROGRESS instead */
00162                 updateRequest.status = JOB_EXECUTION_SUCCEEDED;
00163                 updateRequest.statusDetails = "{\"exampleDetail\":\"a value appropriate for your successful job\"}";
00164             } else {
00165                 updateRequest.status = JOB_EXECUTION_FAILED;
00166                 updateRequest.statusDetails = "{\"failureDetail\":\"Unable to process job document\"}";
00167             }
00168 
00169             updateRequest.expectedVersion = 0;
00170             updateRequest.executionNumber = 0;
00171             updateRequest.includeJobExecutionState = false;
00172             updateRequest.includeJobDocument = false;
00173             updateRequest.clientToken = NULL;
00174 
00175             rc = aws_iot_jobs_send_update(pClient, QOS0, AWS_IOT_MY_THING_NAME, jobId, &updateRequest, 
00176                     topicToPublishUpdate, sizeof(topicToPublishUpdate), messageBuffer, sizeof(messageBuffer));
00177         }
00178     } else {
00179         IOT_INFO("execution property not found, nothing to do");        
00180     }
00181 }
00182 
00183 void iot_update_accepted_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen,
00184                                     IoT_Publish_Message_Params *params, void *pData) {
00185     IOT_UNUSED(pData);
00186     IOT_UNUSED(pClient);
00187     IOT_INFO("\nJOB_UPDATE_TOPIC / accepted callback");
00188     IOT_INFO("topic: %.*s", topicNameLen, topicName);
00189     IOT_INFO("payload: %.*s", (int) params->payloadLen, (char *)params->payload);
00190 }
00191 
00192 void iot_update_rejected_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen,
00193                                     IoT_Publish_Message_Params *params, void *pData) {
00194     IOT_UNUSED(pData);
00195     IOT_UNUSED(pClient);
00196     IOT_INFO("\nJOB_UPDATE_TOPIC / rejected callback");
00197     IOT_INFO("topic: %.*s", topicNameLen, topicName);
00198     IOT_INFO("payload: %.*s", (int) params->payloadLen, (char *)params->payload);
00199 
00200     /* Do error handling here for when the update was rejected */
00201 }
00202 
00203 void disconnectCallbackHandler(AWS_IoT_Client *pClient, void *data) {
00204     IOT_WARN("MQTT Disconnect");
00205     IoT_Error_t rc = FAILURE;
00206 
00207     if(NULL == pClient) {
00208         return;
00209     }
00210 
00211     IOT_UNUSED(data);
00212 
00213     if(aws_iot_is_autoreconnect_enabled(pClient)) {
00214         IOT_INFO("Auto Reconnect is enabled, Reconnecting attempt will start now");
00215     } else {
00216         IOT_WARN("Auto Reconnect not enabled. Starting manual reconnect...");
00217         rc = aws_iot_mqtt_attempt_reconnect(pClient);
00218         if(NETWORK_RECONNECTED == rc) {
00219             IOT_WARN("Manual Reconnect Successful");
00220         } else {
00221             IOT_WARN("Manual Reconnect Failed - %d", rc);
00222         }
00223     }
00224 }
00225 
00226 int main(int argc, char **argv) {
00227     char rootCA[PATH_MAX + 1];
00228     char clientCRT[PATH_MAX + 1];
00229     char clientKey[PATH_MAX + 1];
00230     char CurrentWD[PATH_MAX + 1];
00231     char cPayload[100];
00232 
00233     int32_t i = 0;
00234 
00235     IoT_Error_t rc = FAILURE;
00236 
00237     AWS_IoT_Client client;
00238     IoT_Client_Init_Params mqttInitParams = iotClientInitParamsDefault;
00239     IoT_Client_Connect_Params connectParams = iotClientConnectParamsDefault;
00240 
00241     IoT_Publish_Message_Params paramsQOS0;
00242 
00243     getcwd(CurrentWD, sizeof(CurrentWD));
00244     snprintf(rootCA, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_ROOT_CA_FILENAME);
00245     snprintf(clientCRT, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_CERTIFICATE_FILENAME);
00246     snprintf(clientKey, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_PRIVATE_KEY_FILENAME);
00247 
00248     IOT_DEBUG("rootCA %s", rootCA);
00249     IOT_DEBUG("clientCRT %s", clientCRT);
00250     IOT_DEBUG("clientKey %s", clientKey);
00251 
00252     mqttInitParams.enableAutoReconnect = false; // We enable this later below
00253     mqttInitParams.pHostURL = HostAddress;
00254     mqttInitParams.port = port;
00255     mqttInitParams.pRootCALocation = rootCA;
00256     mqttInitParams.pDeviceCertLocation = clientCRT;
00257     mqttInitParams.pDevicePrivateKeyLocation = clientKey;
00258     mqttInitParams.mqttCommandTimeout_ms = 20000;
00259     mqttInitParams.tlsHandshakeTimeout_ms = 5000;
00260     mqttInitParams.isSSLHostnameVerify = true;
00261     mqttInitParams.disconnectHandler = disconnectCallbackHandler;
00262     mqttInitParams.disconnectHandlerData = NULL;
00263 
00264     rc = aws_iot_mqtt_init(&client, &mqttInitParams);
00265     if(SUCCESS != rc) {
00266         IOT_ERROR("aws_iot_mqtt_init returned error : %d ", rc);
00267         return rc;
00268     }
00269 
00270     connectParams.keepAliveIntervalInSec = 600;
00271     connectParams.isCleanSession = true;
00272     connectParams.MQTTVersion = MQTT_3_1_1;
00273     connectParams.pClientID = AWS_IOT_MQTT_CLIENT_ID;
00274     connectParams.clientIDLen = (uint16_t) strlen(AWS_IOT_MQTT_CLIENT_ID);
00275     connectParams.isWillMsgPresent = false;
00276 
00277     IOT_INFO("Connecting...");
00278     rc = aws_iot_mqtt_connect(&client, &connectParams);
00279     if(SUCCESS != rc) {
00280         IOT_ERROR("Error(%d) connecting to %s:%d", rc, mqttInitParams.pHostURL, mqttInitParams.port);
00281         return rc;
00282     }
00283     /*
00284      * Enable Auto Reconnect functionality. Minimum and Maximum time of Exponential backoff are set in aws_iot_config.h
00285      *  #AWS_IOT_MQTT_MIN_RECONNECT_WAIT_INTERVAL
00286      *  #AWS_IOT_MQTT_MAX_RECONNECT_WAIT_INTERVAL
00287      */
00288     rc = aws_iot_mqtt_autoreconnect_set_status(&client, true);
00289     if(SUCCESS != rc) {
00290         IOT_ERROR("Unable to set Auto Reconnect to true - %d", rc);
00291         return rc;
00292     }
00293 
00294     char topicToSubscribeGetPending[MAX_JOB_TOPIC_LENGTH_BYTES];
00295     char topicToSubscribeNotifyNext[MAX_JOB_TOPIC_LENGTH_BYTES];
00296     char topicToSubscribeGetNext[MAX_JOB_TOPIC_LENGTH_BYTES];
00297     char topicToSubscribeUpdateAccepted[MAX_JOB_TOPIC_LENGTH_BYTES];
00298     char topicToSubscribeUpdateRejected[MAX_JOB_TOPIC_LENGTH_BYTES];
00299 
00300     char topicToPublishGetPending[MAX_JOB_TOPIC_LENGTH_BYTES];
00301     char topicToPublishGetNext[MAX_JOB_TOPIC_LENGTH_BYTES];
00302 
00303     rc = aws_iot_jobs_subscribe_to_job_messages(
00304         &client, QOS0, AWS_IOT_MY_THING_NAME, NULL, JOB_GET_PENDING_TOPIC, JOB_WILDCARD_REPLY_TYPE,
00305         iot_get_pending_callback_handler, NULL, topicToSubscribeGetPending, sizeof(topicToSubscribeGetPending));
00306 
00307     if(SUCCESS != rc) {
00308         IOT_ERROR("Error subscribing JOB_GET_PENDING_TOPIC: %d ", rc);
00309         return rc;
00310     }
00311 
00312     rc = aws_iot_jobs_subscribe_to_job_messages(
00313         &client, QOS0, AWS_IOT_MY_THING_NAME, NULL, JOB_NOTIFY_NEXT_TOPIC, JOB_REQUEST_TYPE,
00314         iot_next_job_callback_handler, NULL, topicToSubscribeNotifyNext, sizeof(topicToSubscribeNotifyNext));
00315 
00316     if(SUCCESS != rc) {
00317         IOT_ERROR("Error subscribing JOB_NOTIFY_NEXT_TOPIC: %d ", rc);
00318         return rc;
00319     }
00320 
00321     rc = aws_iot_jobs_subscribe_to_job_messages(
00322         &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_NEXT, JOB_DESCRIBE_TOPIC, JOB_WILDCARD_REPLY_TYPE,
00323         iot_next_job_callback_handler, NULL, topicToSubscribeGetNext, sizeof(topicToSubscribeGetNext));
00324 
00325     if(SUCCESS != rc) {
00326         IOT_ERROR("Error subscribing JOB_DESCRIBE_TOPIC ($next): %d ", rc);
00327         return rc;
00328     }
00329 
00330     rc = aws_iot_jobs_subscribe_to_job_messages(
00331         &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_WILDCARD, JOB_UPDATE_TOPIC, JOB_ACCEPTED_REPLY_TYPE,
00332         iot_update_accepted_callback_handler, NULL, topicToSubscribeUpdateAccepted, sizeof(topicToSubscribeUpdateAccepted));
00333 
00334     if(SUCCESS != rc) {
00335         IOT_ERROR("Error subscribing JOB_UPDATE_TOPIC/accepted: %d ", rc);
00336         return rc;
00337     }
00338 
00339     rc = aws_iot_jobs_subscribe_to_job_messages(
00340         &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_WILDCARD, JOB_UPDATE_TOPIC, JOB_REJECTED_REPLY_TYPE,
00341         iot_update_rejected_callback_handler, NULL, topicToSubscribeUpdateRejected, sizeof(topicToSubscribeUpdateRejected));
00342 
00343     if(SUCCESS != rc) {
00344         IOT_ERROR("Error subscribing JOB_UPDATE_TOPIC/rejected: %d ", rc);
00345         return rc;
00346     }
00347 
00348     paramsQOS0.qos = QOS0;
00349     paramsQOS0.payload = (void *) cPayload;
00350     paramsQOS0.isRetained = 0;
00351     paramsQOS0.payloadLen = strlen(cPayload);
00352 
00353     rc = aws_iot_jobs_send_query(&client, QOS0, AWS_IOT_MY_THING_NAME, NULL, NULL, topicToPublishGetPending, sizeof(topicToPublishGetPending), NULL, 0, JOB_GET_PENDING_TOPIC);
00354 
00355     AwsIotDescribeJobExecutionRequest describeRequest;
00356     describeRequest.executionNumber = 0;
00357     describeRequest.includeJobDocument = true;
00358     describeRequest.clientToken = NULL;
00359 
00360     rc = aws_iot_jobs_describe(&client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_NEXT, &describeRequest, topicToPublishGetNext, sizeof(topicToPublishGetNext), NULL, 0);
00361 
00362     while(SUCCESS == rc) {
00363         //Max time the yield function will wait for read messages
00364         rc = aws_iot_mqtt_yield(&client, 50000);
00365     }
00366 
00367     return rc;
00368 }