Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
jobs_sample.c
00001 /* 00002 * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"). 00005 * You may not use this file except in compliance with the License. 00006 * A copy of the License is located at 00007 * 00008 * http://aws.amazon.com/apache2.0 00009 * 00010 * or in the "license" file accompanying this file. This file is distributed 00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 00012 * express or implied. See the License for the specific language governing 00013 * permissions and limitations under the License. 00014 */ 00015 00016 /** 00017 * 00018 * This example takes the parameters from the aws_iot_config.h file and establishes 00019 * a connection to the AWS IoT MQTT Platform. It performs several operations to 00020 * demonstrate the basic capabilities of the AWS IoT Jobs platform. 00021 * 00022 * If all the certs are correct, you should see the list of pending Job Executions 00023 * printed out by the iot_get_pending_callback_handler. If there are any existing pending 00024 * job executions each will be processed one at a time in the iot_next_job_callback_handler. 00025 * After all of the pending jobs have been processed the program will wait for 00026 * notifications for new pending jobs and process them one at a time as they come in. 00027 * 00028 * In the main body you can see how each callback is registered for each corresponding 00029 * Jobs topic. 00030 * 00031 */ 00032 #include <stdio.h> 00033 #include <stdlib.h> 00034 #include <ctype.h> 00035 #include <unistd.h> 00036 #include <limits.h> 00037 #include <string.h> 00038 00039 #include "aws_iot_config.h" 00040 #include "aws_iot_json_utils.h" 00041 #include "aws_iot_log.h" 00042 #include "aws_iot_version.h" 00043 #include "aws_iot_mqtt_client_interface.h" 00044 #include "aws_iot_jobs_interface.h" 00045 00046 /** 00047 * @brief Default cert location 00048 */ 00049 char certDirectory[PATH_MAX + 1] = "../../../certs"; 00050 00051 /** 00052 * @brief Default MQTT HOST URL is pulled from the aws_iot_config.h 00053 */ 00054 char HostAddress[255] = AWS_IOT_MQTT_HOST; 00055 00056 /** 00057 * @brief Default MQTT port is pulled from the aws_iot_config.h 00058 */ 00059 uint32_t port = AWS_IOT_MQTT_PORT; 00060 00061 static jsmn_parser jsonParser; 00062 static jsmntok_t jsonTokenStruct[MAX_JSON_TOKEN_EXPECTED]; 00063 static int32_t tokenCount; 00064 00065 void iot_get_pending_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, 00066 IoT_Publish_Message_Params *params, void *pData) { 00067 IOT_UNUSED(pData); 00068 IOT_UNUSED(pClient); 00069 IOT_INFO("\nJOB_GET_PENDING_TOPIC callback"); 00070 IOT_INFO("topic: %.*s", topicNameLen, topicName); 00071 IOT_INFO("payload: %.*s", (int) params->payloadLen, (char *)params->payload); 00072 00073 jsmn_init(&jsonParser); 00074 00075 tokenCount = jsmn_parse(&jsonParser, params->payload, (int) params->payloadLen, jsonTokenStruct, MAX_JSON_TOKEN_EXPECTED); 00076 00077 if(tokenCount < 0) { 00078 IOT_WARN("Failed to parse JSON: %d", tokenCount); 00079 return; 00080 } 00081 00082 /* Assume the top-level element is an object */ 00083 if(tokenCount < 1 || jsonTokenStruct[0].type != JSMN_OBJECT) { 00084 IOT_WARN("Top Level is not an object"); 00085 return; 00086 } 00087 00088 jsmntok_t *jobs; 00089 00090 jobs = findToken("inProgressJobs", params->payload, jsonTokenStruct); 00091 00092 if (jobs) { 00093 IOT_INFO("inProgressJobs: %.*s", jobs->end - jobs->start, (char *)params->payload + jobs->start); 00094 } 00095 00096 jobs = findToken("queuedJobs", params->payload, jsonTokenStruct); 00097 00098 if (jobs) { 00099 IOT_INFO("queuedJobs: %.*s", jobs->end - jobs->start, (char *)params->payload + jobs->start); 00100 } 00101 } 00102 00103 void iot_next_job_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, 00104 IoT_Publish_Message_Params *params, void *pData) { 00105 char topicToPublishUpdate[MAX_JOB_TOPIC_LENGTH_BYTES]; 00106 char messageBuffer[200]; 00107 00108 IOT_UNUSED(pData); 00109 IOT_UNUSED(pClient); 00110 IOT_INFO("\nJOB_NOTIFY_NEXT_TOPIC / JOB_DESCRIBE_TOPIC($next) callback"); 00111 IOT_INFO("topic: %.*s", topicNameLen, topicName); 00112 IOT_INFO("payload: %.*s", (int) params->payloadLen, (char *)params->payload); 00113 00114 jsmn_init(&jsonParser); 00115 00116 tokenCount = jsmn_parse(&jsonParser, params->payload, (int) params->payloadLen, jsonTokenStruct, MAX_JSON_TOKEN_EXPECTED); 00117 00118 if(tokenCount < 0) { 00119 IOT_WARN("Failed to parse JSON: %d", tokenCount); 00120 return; 00121 } 00122 00123 /* Assume the top-level element is an object */ 00124 if(tokenCount < 1 || jsonTokenStruct[0].type != JSMN_OBJECT) { 00125 IOT_WARN("Top Level is not an object"); 00126 return; 00127 } 00128 00129 jsmntok_t *tokExecution; 00130 00131 tokExecution = findToken("execution", params->payload, jsonTokenStruct); 00132 00133 if (tokExecution) { 00134 IOT_INFO("execution: %.*s", tokExecution->end - tokExecution->start, (char *)params->payload + tokExecution->start); 00135 00136 jsmntok_t *tok; 00137 00138 tok = findToken("jobId", params->payload, tokExecution); 00139 00140 if (tok) { 00141 IoT_Error_t rc; 00142 char jobId[MAX_SIZE_OF_JOB_ID + 1]; 00143 AwsIotJobExecutionUpdateRequest updateRequest; 00144 00145 rc = parseStringValue(jobId, MAX_SIZE_OF_JOB_ID + 1, params->payload, tok); 00146 if(SUCCESS != rc) { 00147 IOT_ERROR("parseStringValue returned error : %d ", rc); 00148 return; 00149 } 00150 00151 IOT_INFO("jobId: %s", jobId); 00152 00153 tok = findToken("jobDocument", params->payload, tokExecution); 00154 00155 /* 00156 * Do your job processing here. 00157 */ 00158 00159 if (tok) { 00160 IOT_INFO("jobDocument: %.*s", tok->end - tok->start, (char *)params->payload + tok->start); 00161 /* Alternatively if the job still has more steps the status can be set to JOB_EXECUTION_IN_PROGRESS instead */ 00162 updateRequest.status = JOB_EXECUTION_SUCCEEDED; 00163 updateRequest.statusDetails = "{\"exampleDetail\":\"a value appropriate for your successful job\"}"; 00164 } else { 00165 updateRequest.status = JOB_EXECUTION_FAILED; 00166 updateRequest.statusDetails = "{\"failureDetail\":\"Unable to process job document\"}"; 00167 } 00168 00169 updateRequest.expectedVersion = 0; 00170 updateRequest.executionNumber = 0; 00171 updateRequest.includeJobExecutionState = false; 00172 updateRequest.includeJobDocument = false; 00173 updateRequest.clientToken = NULL; 00174 00175 rc = aws_iot_jobs_send_update(pClient, QOS0, AWS_IOT_MY_THING_NAME, jobId, &updateRequest, 00176 topicToPublishUpdate, sizeof(topicToPublishUpdate), messageBuffer, sizeof(messageBuffer)); 00177 } 00178 } else { 00179 IOT_INFO("execution property not found, nothing to do"); 00180 } 00181 } 00182 00183 void iot_update_accepted_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, 00184 IoT_Publish_Message_Params *params, void *pData) { 00185 IOT_UNUSED(pData); 00186 IOT_UNUSED(pClient); 00187 IOT_INFO("\nJOB_UPDATE_TOPIC / accepted callback"); 00188 IOT_INFO("topic: %.*s", topicNameLen, topicName); 00189 IOT_INFO("payload: %.*s", (int) params->payloadLen, (char *)params->payload); 00190 } 00191 00192 void iot_update_rejected_callback_handler(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, 00193 IoT_Publish_Message_Params *params, void *pData) { 00194 IOT_UNUSED(pData); 00195 IOT_UNUSED(pClient); 00196 IOT_INFO("\nJOB_UPDATE_TOPIC / rejected callback"); 00197 IOT_INFO("topic: %.*s", topicNameLen, topicName); 00198 IOT_INFO("payload: %.*s", (int) params->payloadLen, (char *)params->payload); 00199 00200 /* Do error handling here for when the update was rejected */ 00201 } 00202 00203 void disconnectCallbackHandler(AWS_IoT_Client *pClient, void *data) { 00204 IOT_WARN("MQTT Disconnect"); 00205 IoT_Error_t rc = FAILURE; 00206 00207 if(NULL == pClient) { 00208 return; 00209 } 00210 00211 IOT_UNUSED(data); 00212 00213 if(aws_iot_is_autoreconnect_enabled(pClient)) { 00214 IOT_INFO("Auto Reconnect is enabled, Reconnecting attempt will start now"); 00215 } else { 00216 IOT_WARN("Auto Reconnect not enabled. Starting manual reconnect..."); 00217 rc = aws_iot_mqtt_attempt_reconnect(pClient); 00218 if(NETWORK_RECONNECTED == rc) { 00219 IOT_WARN("Manual Reconnect Successful"); 00220 } else { 00221 IOT_WARN("Manual Reconnect Failed - %d", rc); 00222 } 00223 } 00224 } 00225 00226 int main(int argc, char **argv) { 00227 char rootCA[PATH_MAX + 1]; 00228 char clientCRT[PATH_MAX + 1]; 00229 char clientKey[PATH_MAX + 1]; 00230 char CurrentWD[PATH_MAX + 1]; 00231 char cPayload[100]; 00232 00233 int32_t i = 0; 00234 00235 IoT_Error_t rc = FAILURE; 00236 00237 AWS_IoT_Client client; 00238 IoT_Client_Init_Params mqttInitParams = iotClientInitParamsDefault; 00239 IoT_Client_Connect_Params connectParams = iotClientConnectParamsDefault; 00240 00241 IoT_Publish_Message_Params paramsQOS0; 00242 00243 getcwd(CurrentWD, sizeof(CurrentWD)); 00244 snprintf(rootCA, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_ROOT_CA_FILENAME); 00245 snprintf(clientCRT, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_CERTIFICATE_FILENAME); 00246 snprintf(clientKey, PATH_MAX + 1, "%s/%s/%s", CurrentWD, certDirectory, AWS_IOT_PRIVATE_KEY_FILENAME); 00247 00248 IOT_DEBUG("rootCA %s", rootCA); 00249 IOT_DEBUG("clientCRT %s", clientCRT); 00250 IOT_DEBUG("clientKey %s", clientKey); 00251 00252 mqttInitParams.enableAutoReconnect = false; // We enable this later below 00253 mqttInitParams.pHostURL = HostAddress; 00254 mqttInitParams.port = port; 00255 mqttInitParams.pRootCALocation = rootCA; 00256 mqttInitParams.pDeviceCertLocation = clientCRT; 00257 mqttInitParams.pDevicePrivateKeyLocation = clientKey; 00258 mqttInitParams.mqttCommandTimeout_ms = 20000; 00259 mqttInitParams.tlsHandshakeTimeout_ms = 5000; 00260 mqttInitParams.isSSLHostnameVerify = true; 00261 mqttInitParams.disconnectHandler = disconnectCallbackHandler; 00262 mqttInitParams.disconnectHandlerData = NULL; 00263 00264 rc = aws_iot_mqtt_init(&client, &mqttInitParams); 00265 if(SUCCESS != rc) { 00266 IOT_ERROR("aws_iot_mqtt_init returned error : %d ", rc); 00267 return rc; 00268 } 00269 00270 connectParams.keepAliveIntervalInSec = 600; 00271 connectParams.isCleanSession = true; 00272 connectParams.MQTTVersion = MQTT_3_1_1; 00273 connectParams.pClientID = AWS_IOT_MQTT_CLIENT_ID; 00274 connectParams.clientIDLen = (uint16_t) strlen(AWS_IOT_MQTT_CLIENT_ID); 00275 connectParams.isWillMsgPresent = false; 00276 00277 IOT_INFO("Connecting..."); 00278 rc = aws_iot_mqtt_connect(&client, &connectParams); 00279 if(SUCCESS != rc) { 00280 IOT_ERROR("Error(%d) connecting to %s:%d", rc, mqttInitParams.pHostURL, mqttInitParams.port); 00281 return rc; 00282 } 00283 /* 00284 * Enable Auto Reconnect functionality. Minimum and Maximum time of Exponential backoff are set in aws_iot_config.h 00285 * #AWS_IOT_MQTT_MIN_RECONNECT_WAIT_INTERVAL 00286 * #AWS_IOT_MQTT_MAX_RECONNECT_WAIT_INTERVAL 00287 */ 00288 rc = aws_iot_mqtt_autoreconnect_set_status(&client, true); 00289 if(SUCCESS != rc) { 00290 IOT_ERROR("Unable to set Auto Reconnect to true - %d", rc); 00291 return rc; 00292 } 00293 00294 char topicToSubscribeGetPending[MAX_JOB_TOPIC_LENGTH_BYTES]; 00295 char topicToSubscribeNotifyNext[MAX_JOB_TOPIC_LENGTH_BYTES]; 00296 char topicToSubscribeGetNext[MAX_JOB_TOPIC_LENGTH_BYTES]; 00297 char topicToSubscribeUpdateAccepted[MAX_JOB_TOPIC_LENGTH_BYTES]; 00298 char topicToSubscribeUpdateRejected[MAX_JOB_TOPIC_LENGTH_BYTES]; 00299 00300 char topicToPublishGetPending[MAX_JOB_TOPIC_LENGTH_BYTES]; 00301 char topicToPublishGetNext[MAX_JOB_TOPIC_LENGTH_BYTES]; 00302 00303 rc = aws_iot_jobs_subscribe_to_job_messages( 00304 &client, QOS0, AWS_IOT_MY_THING_NAME, NULL, JOB_GET_PENDING_TOPIC, JOB_WILDCARD_REPLY_TYPE, 00305 iot_get_pending_callback_handler, NULL, topicToSubscribeGetPending, sizeof(topicToSubscribeGetPending)); 00306 00307 if(SUCCESS != rc) { 00308 IOT_ERROR("Error subscribing JOB_GET_PENDING_TOPIC: %d ", rc); 00309 return rc; 00310 } 00311 00312 rc = aws_iot_jobs_subscribe_to_job_messages( 00313 &client, QOS0, AWS_IOT_MY_THING_NAME, NULL, JOB_NOTIFY_NEXT_TOPIC, JOB_REQUEST_TYPE, 00314 iot_next_job_callback_handler, NULL, topicToSubscribeNotifyNext, sizeof(topicToSubscribeNotifyNext)); 00315 00316 if(SUCCESS != rc) { 00317 IOT_ERROR("Error subscribing JOB_NOTIFY_NEXT_TOPIC: %d ", rc); 00318 return rc; 00319 } 00320 00321 rc = aws_iot_jobs_subscribe_to_job_messages( 00322 &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_NEXT, JOB_DESCRIBE_TOPIC, JOB_WILDCARD_REPLY_TYPE, 00323 iot_next_job_callback_handler, NULL, topicToSubscribeGetNext, sizeof(topicToSubscribeGetNext)); 00324 00325 if(SUCCESS != rc) { 00326 IOT_ERROR("Error subscribing JOB_DESCRIBE_TOPIC ($next): %d ", rc); 00327 return rc; 00328 } 00329 00330 rc = aws_iot_jobs_subscribe_to_job_messages( 00331 &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_WILDCARD, JOB_UPDATE_TOPIC, JOB_ACCEPTED_REPLY_TYPE, 00332 iot_update_accepted_callback_handler, NULL, topicToSubscribeUpdateAccepted, sizeof(topicToSubscribeUpdateAccepted)); 00333 00334 if(SUCCESS != rc) { 00335 IOT_ERROR("Error subscribing JOB_UPDATE_TOPIC/accepted: %d ", rc); 00336 return rc; 00337 } 00338 00339 rc = aws_iot_jobs_subscribe_to_job_messages( 00340 &client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_WILDCARD, JOB_UPDATE_TOPIC, JOB_REJECTED_REPLY_TYPE, 00341 iot_update_rejected_callback_handler, NULL, topicToSubscribeUpdateRejected, sizeof(topicToSubscribeUpdateRejected)); 00342 00343 if(SUCCESS != rc) { 00344 IOT_ERROR("Error subscribing JOB_UPDATE_TOPIC/rejected: %d ", rc); 00345 return rc; 00346 } 00347 00348 paramsQOS0.qos = QOS0; 00349 paramsQOS0.payload = (void *) cPayload; 00350 paramsQOS0.isRetained = 0; 00351 paramsQOS0.payloadLen = strlen(cPayload); 00352 00353 rc = aws_iot_jobs_send_query(&client, QOS0, AWS_IOT_MY_THING_NAME, NULL, NULL, topicToPublishGetPending, sizeof(topicToPublishGetPending), NULL, 0, JOB_GET_PENDING_TOPIC); 00354 00355 AwsIotDescribeJobExecutionRequest describeRequest; 00356 describeRequest.executionNumber = 0; 00357 describeRequest.includeJobDocument = true; 00358 describeRequest.clientToken = NULL; 00359 00360 rc = aws_iot_jobs_describe(&client, QOS0, AWS_IOT_MY_THING_NAME, JOB_ID_NEXT, &describeRequest, topicToPublishGetNext, sizeof(topicToPublishGetNext), NULL, 0); 00361 00362 while(SUCCESS == rc) { 00363 //Max time the yield function will wait for read messages 00364 rc = aws_iot_mqtt_yield(&client, 50000); 00365 } 00366 00367 return rc; 00368 }
Generated on Tue Jul 12 2022 19:02:38 by
1.7.2