Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
test1.cpp
00001 /******************************************************************************* 00002 * Copyright (c) 2009, 2017 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial implementation for embedded C client 00015 *******************************************************************************/ 00016 00017 00018 /** 00019 * @file 00020 * Tests for the Paho embedded C "high" level client 00021 */ 00022 00023 #include <stdio.h> 00024 #include <string.h> 00025 #include <memory.h> 00026 //#define MQTT_DEBUG 00027 #include "MQTTClient.h" 00028 00029 #define DEFAULT_STACK_SIZE -1 00030 00031 #include "linux.cpp" 00032 00033 #include <sys/time.h> 00034 #include <stdlib.h> 00035 00036 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0])) 00037 00038 void usage(void) 00039 { 00040 printf("help!!\n"); 00041 exit(EXIT_FAILURE); 00042 } 00043 00044 struct Options 00045 { 00046 char* host; /**< connection to system under test. */ 00047 int port; 00048 char* proxy_host; 00049 int proxy_port; 00050 int verbose; 00051 int test_no; 00052 int MQTTVersion; 00053 int iterations; 00054 } options = 00055 { 00056 (char*)"localhost", 00057 1883, 00058 (char*)"localhost", 00059 1885, 00060 0, 00061 0, 00062 4, 00063 1, 00064 }; 00065 00066 void getopts(int argc, char** argv) 00067 { 00068 int count = 1; 00069 00070 while (count < argc) 00071 { 00072 if (strcmp(argv[count], "--test_no") == 0) 00073 { 00074 if (++count < argc) 00075 options.test_no = atoi(argv[count]); 00076 else 00077 usage(); 00078 } 00079 else if (strcmp(argv[count], "--host") == 0) 00080 { 00081 if (++count < argc) 00082 { 00083 options.host = argv[count]; 00084 printf("\nSetting host to %s\n", options.host); 00085 } 00086 else 00087 usage(); 00088 } 00089 else if (strcmp(argv[count], "--port") == 0) 00090 { 00091 if (++count < argc) 00092 { 00093 options.port = atoi(argv[count]); 00094 printf("\nSetting port to %d\n", options.port); 00095 } 00096 else 00097 usage(); 00098 } 00099 else if (strcmp(argv[count], "--proxy_host") == 0) 00100 { 00101 if (++count < argc) 00102 options.proxy_host = argv[count]; 00103 else 00104 usage(); 00105 } 00106 else if (strcmp(argv[count], "--proxy_port") == 0) 00107 { 00108 if (++count < argc) 00109 { 00110 options.proxy_port = atoi(argv[count]); 00111 printf("\nSetting proxy port to %d\n", options.proxy_port); 00112 } 00113 else 00114 usage(); 00115 } 00116 else if (strcmp(argv[count], "--MQTTversion") == 0) 00117 { 00118 if (++count < argc) 00119 { 00120 options.MQTTVersion = atoi(argv[count]); 00121 printf("setting MQTT version to %d\n", options.MQTTVersion); 00122 } 00123 else 00124 usage(); 00125 } 00126 else if (strcmp(argv[count], "--iterations") == 0) 00127 { 00128 if (++count < argc) 00129 options.iterations = atoi(argv[count]); 00130 else 00131 usage(); 00132 } 00133 else if (strcmp(argv[count], "--verbose") == 0) 00134 { 00135 options.verbose = 1; 00136 printf("\nSetting verbose on\n"); 00137 } 00138 count++; 00139 } 00140 } 00141 00142 00143 #define LOGA_DEBUG 0 00144 #define LOGA_INFO 1 00145 #include <stdarg.h> 00146 #include <time.h> 00147 #include <sys/timeb.h> 00148 void MyLog(int LOGA_level, const char* format, ...) 00149 { 00150 static char msg_buf[256]; 00151 va_list args; 00152 struct timeb ts; 00153 00154 struct tm *timeinfo; 00155 00156 if (LOGA_level == LOGA_DEBUG && options.verbose == 0) 00157 return; 00158 00159 ftime(&ts); 00160 timeinfo = localtime(&ts.time); 00161 strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo); 00162 00163 sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm); 00164 00165 va_start(args, format); 00166 vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args); 00167 va_end(args); 00168 00169 printf("%s\n", msg_buf); 00170 fflush(stdout); 00171 } 00172 00173 00174 #if defined(WIN32) || defined(_WINDOWS) 00175 #define mqsleep(A) Sleep(1000*A) 00176 #define START_TIME_TYPE DWORD 00177 static DWORD start_time = 0; 00178 START_TIME_TYPE start_clock(void) 00179 { 00180 return GetTickCount(); 00181 } 00182 #elif defined(AIX) 00183 #define mqsleep sleep 00184 #define START_TIME_TYPE struct timespec 00185 START_TIME_TYPE start_clock(void) 00186 { 00187 static struct timespec start; 00188 clock_gettime(CLOCK_REALTIME, &start); 00189 return start; 00190 } 00191 #else 00192 #define mqsleep sleep 00193 #define START_TIME_TYPE struct timeval 00194 /* TODO - unused - remove? static struct timeval start_time; */ 00195 START_TIME_TYPE start_clock(void) 00196 { 00197 struct timeval start_time; 00198 gettimeofday(&start_time, NULL); 00199 return start_time; 00200 } 00201 #endif 00202 00203 00204 #if defined(WIN32) 00205 long elapsed(START_TIME_TYPE start_time) 00206 { 00207 return GetTickCount() - start_time; 00208 } 00209 #elif defined(AIX) 00210 #define assert(a) 00211 long elapsed(struct timespec start) 00212 { 00213 struct timespec now, res; 00214 00215 clock_gettime(CLOCK_REALTIME, &now); 00216 ntimersub(now, start, res); 00217 return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L; 00218 } 00219 #else 00220 long elapsed(START_TIME_TYPE start_time) 00221 { 00222 struct timeval now, res; 00223 00224 gettimeofday(&now, NULL); 00225 timersub(&now, &start_time, &res); 00226 return (res.tv_sec)*1000 + (res.tv_usec)/1000; 00227 } 00228 #endif 00229 00230 00231 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d) 00232 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e) 00233 00234 int tests = 0; 00235 int failures = 0; 00236 FILE* xml; 00237 START_TIME_TYPE global_start_time; 00238 char output[3000]; 00239 char* cur_output = output; 00240 00241 00242 void write_test_result(void) 00243 { 00244 long duration = elapsed(global_start_time); 00245 00246 fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000); 00247 if (cur_output != output) 00248 { 00249 fprintf(xml, "%s", output); 00250 cur_output = output; 00251 } 00252 fprintf(xml, "</testcase>\n"); 00253 } 00254 00255 00256 void myassert(const char* filename, int lineno, const char* description, int value, const char* format, ...) 00257 { 00258 ++tests; 00259 if (!value) 00260 { 00261 va_list args; 00262 00263 ++failures; 00264 MyLog(LOGA_INFO, (char*)"Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description); 00265 00266 va_start(args, format); 00267 vprintf(format, args); 00268 va_end(args); 00269 00270 cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n", 00271 description, filename, lineno); 00272 } 00273 else 00274 MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description); 00275 } 00276 00277 00278 static volatile MQTT::MessageData* test1_message_data = NULL; 00279 static MQTT::Message pubmsg; 00280 00281 void messageArrived(MQTT::MessageData& md) 00282 { 00283 test1_message_data = &md; 00284 MQTT::Message &m = md.message; 00285 00286 assert("Good message lengths", pubmsg.payloadlen == m.payloadlen, 00287 "payloadlen was %d", m.payloadlen); 00288 00289 if (pubmsg.payloadlen == m.payloadlen) 00290 assert("Good message contents", memcmp(m.payload, pubmsg.payload, m.payloadlen) == 0, 00291 "payload was %s", m.payload); 00292 } 00293 00294 00295 /********************************************************************* 00296 00297 Test1: single-threaded client 00298 00299 *********************************************************************/ 00300 void test1_sendAndReceive(MQTT::Client<IPStack, Countdown, 1000>& client, int qos, const char* test_topic) 00301 { 00302 char* topicName = NULL; 00303 int topicLen; 00304 int i = 0; 00305 int iterations = 50; 00306 int rc; 00307 int wait_seconds; 00308 00309 MyLog(LOGA_DEBUG, "%d messages at QoS %d", iterations, qos); 00310 memset(&pubmsg, '\0', sizeof(pubmsg)); 00311 pubmsg.payload = (void*)"a much longer message that we can shorten to the extent that we need to payload up to 11"; 00312 pubmsg.payloadlen = 11; 00313 pubmsg.qos = (MQTT::QoS)qos; 00314 pubmsg.retained = false; 00315 pubmsg.dup = false; 00316 00317 for (i = 0; i < iterations; ++i) 00318 { 00319 test1_message_data = NULL; 00320 rc = client.publish(test_topic, pubmsg); 00321 assert("Good rc from publish", rc == MQTT::SUCCESS, "rc was %d", rc); 00322 00323 /* wait for the message to be received */ 00324 wait_seconds = 10; 00325 while ((test1_message_data == NULL) && (wait_seconds-- > 0)) 00326 { 00327 client.yield(100); 00328 } 00329 assert("Message Arrived", wait_seconds > 0, "Time out waiting for message %d\n", i); 00330 00331 if (!test1_message_data) 00332 printf("No message received within timeout period\n"); 00333 } 00334 00335 /* wait to receive any outstanding messages */ 00336 wait_seconds = 2; 00337 while (wait_seconds-- > 0) 00338 { 00339 client.yield(1000); 00340 } 00341 } 00342 00343 00344 int test1(struct Options options) 00345 { 00346 MQTT::QoS subsqos = MQTT::QOS2; 00347 int rc = 0; 00348 const char* test_topic = "C client test1"; 00349 00350 fprintf(xml, "<testcase classname=\"test1\" name=\"single threaded client using receive\""); 00351 global_start_time = start_clock(); 00352 failures = 0; 00353 MyLog(LOGA_INFO, "Starting test 1 - single threaded client using receive"); 00354 00355 IPStack ipstack = IPStack(); 00356 MQTT::Client<IPStack, Countdown, 1000> client = MQTT::Client<IPStack, Countdown, 1000>(ipstack); 00357 00358 MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 00359 data.willFlag = 1; 00360 data.MQTTVersion = options.MQTTVersion; 00361 data.clientID.cstring = (char*)"single-threaded-test"; 00362 data.username.cstring = (char*)"testuser"; 00363 data.password.cstring = (char*)"testpassword"; 00364 00365 data.keepAliveInterval = 20; 00366 data.cleansession = 1; 00367 00368 data.will.message.cstring = (char*)"will message"; 00369 data.will.qos = 1; 00370 data.will.retained = 0; 00371 data.will.topicName.cstring = (char*)"will topic"; 00372 00373 MyLog(LOGA_DEBUG, "Connecting"); 00374 rc = ipstack.connect(options.host, options.port); 00375 assert("Good rc from TCP connect", rc == MQTT::SUCCESS, "rc was %d", rc); 00376 if (rc != MQTT::SUCCESS) 00377 goto exit; 00378 00379 rc = client.connect(data); 00380 assert("Good rc from connect", rc == MQTT::SUCCESS, "rc was %d", rc); 00381 if (rc != MQTT::SUCCESS) 00382 goto exit; 00383 00384 rc = client.subscribe(test_topic, subsqos, messageArrived); 00385 assert("Good rc from subscribe", rc == MQTT::SUCCESS, "rc was %d", rc); 00386 00387 test1_sendAndReceive(client, 0, test_topic); 00388 test1_sendAndReceive(client, 1, test_topic); 00389 test1_sendAndReceive(client, 2, test_topic); 00390 00391 MyLog(LOGA_DEBUG, "Stopping\n"); 00392 00393 rc = client.unsubscribe(test_topic); 00394 assert("Unsubscribe successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00395 rc = client.disconnect(); 00396 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00397 ipstack.disconnect(); 00398 00399 /* Just to make sure we can connect again */ 00400 rc = ipstack.connect(options.host, options.port); 00401 assert("TCP connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00402 rc = client.connect(data); 00403 assert("Connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00404 rc = client.disconnect(); 00405 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00406 ipstack.disconnect(); 00407 00408 exit: 00409 MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.", 00410 (failures == 0) ? "passed" : "failed", tests, failures); 00411 write_test_result(); 00412 return failures; 00413 } 00414 00415 00416 /********************************************************************* 00417 00418 Test 2: connack return data 00419 00420 *********************************************************************/ 00421 int test2(struct Options options) 00422 { 00423 MQTT::QoS subsqos = MQTT::QOS2; 00424 int rc; 00425 const char* test_topic = "C client test2"; 00426 00427 fprintf(xml, "<testcase classname=\"test2\" name=\"connack return data\""); 00428 global_start_time = start_clock(); 00429 failures = 0; 00430 MyLog(LOGA_INFO, "Starting test 2 - connack return data"); 00431 00432 IPStack ipstack = IPStack(); 00433 MQTT::Client<IPStack, Countdown, 1000> client = MQTT::Client<IPStack, Countdown, 1000>(ipstack); 00434 00435 MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 00436 data.willFlag = 1; 00437 data.MQTTVersion = options.MQTTVersion; 00438 data.clientID.cstring = (char*)"connack-return-data"; 00439 data.username.cstring = (char*)"testuser"; 00440 data.password.cstring = (char*)"testpassword"; 00441 00442 data.keepAliveInterval = 20; 00443 data.cleansession = 1; 00444 00445 data.will.message.cstring = (char*)"will message"; 00446 data.will.qos = 1; 00447 data.will.retained = 0; 00448 data.will.topicName.cstring = (char*)"will topic"; 00449 00450 MyLog(LOGA_DEBUG, "Connecting"); 00451 rc = ipstack.connect(options.host, options.port); 00452 assert("Good rc from TCP connect", rc == MQTT::SUCCESS, "rc was %d", rc); 00453 if (rc != MQTT::SUCCESS) 00454 goto exit; 00455 00456 MQTT::connackData connack; 00457 rc = client.connect(data, connack); 00458 assert("Good rc from connect", rc == MQTT::SUCCESS, "rc was %d", rc); 00459 if (rc != MQTT::SUCCESS) 00460 goto exit; 00461 00462 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00463 assert("Session present is 0", connack.sessionPresent == 0, 00464 "sessionPresent was %d", connack.sessionPresent); 00465 00466 rc = client.disconnect(); 00467 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00468 ipstack.disconnect(); 00469 00470 /* reconnect with cleansession false */ 00471 data.cleansession = 0; 00472 rc = ipstack.connect(options.host, options.port); 00473 assert("TCP connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00474 rc = client.connect(data, connack); 00475 assert("Connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00476 00477 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00478 assert("Session present is 0", connack.sessionPresent == 0, 00479 "sessionPresent was %d", connack.sessionPresent); 00480 00481 MQTT::subackData suback; 00482 rc = client.subscribe(test_topic, subsqos, messageArrived, suback); 00483 assert("Good rc from subscribe", rc == MQTT::SUCCESS, "rc was %d", rc); 00484 assert("Granted QoS rc from subscribe", suback.grantedQoS == MQTT::QOS2, 00485 "rc was %d", suback.grantedQoS); 00486 00487 rc = client.disconnect(); 00488 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00489 ipstack.disconnect(); 00490 00491 /* reconnect with cleansession false */ 00492 data.cleansession = 0; 00493 rc = ipstack.connect(options.host, options.port); 00494 assert("TCP connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00495 rc = client.connect(data, connack); 00496 assert("Connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00497 00498 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00499 assert("Session present is 1", connack.sessionPresent == 1, 00500 "sessionPresent was %d", connack.sessionPresent); 00501 00502 rc = client.disconnect(); 00503 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00504 ipstack.disconnect(); 00505 00506 /* reconnect with cleansession true */ 00507 data.cleansession = 1; 00508 rc = ipstack.connect(options.host, options.port); 00509 assert("TCP connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00510 rc = client.connect(data, connack); 00511 assert("Connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00512 00513 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00514 assert("Session present is 0", connack.sessionPresent == 0, 00515 "sessionPresent was %d", connack.sessionPresent); 00516 00517 rc = client.disconnect(); 00518 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00519 ipstack.disconnect(); 00520 00521 exit: 00522 MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.", 00523 (failures == 0) ? "passed" : "failed", tests, failures); 00524 write_test_result(); 00525 return failures; 00526 } 00527 00528 00529 /********************************************************************* 00530 00531 Test 3: client session state 00532 00533 *********************************************************************/ 00534 static volatile MQTT::MessageData* test2_message_data = NULL; 00535 00536 void messageArrived2(MQTT::MessageData& md) 00537 { 00538 test2_message_data = &md; 00539 MQTT::Message &m = md.message; 00540 00541 assert("Good message lengths", pubmsg.payloadlen == m.payloadlen, 00542 "payloadlen was %d", m.payloadlen); 00543 00544 if (pubmsg.payloadlen == m.payloadlen) 00545 assert("Good message contents", memcmp(m.payload, pubmsg.payload, m.payloadlen) == 0, 00546 "payload was %s", m.payload); 00547 } 00548 00549 00550 int check_subs_exist(MQTT::Client<IPStack, Countdown, 1000>& client, const char* test_topic, int which) 00551 { 00552 int rc = MQTT::FAILURE; 00553 int wait_seconds = 0; 00554 00555 memset(&pubmsg, '\0', sizeof(pubmsg)); 00556 pubmsg.payload = (void*)"a much longer message that we can shorten to the extent that we need to payload up to 11"; 00557 pubmsg.payloadlen = 11; 00558 pubmsg.qos = MQTT::QOS2; 00559 pubmsg.retained = false; 00560 pubmsg.dup = false; 00561 00562 test1_message_data = test2_message_data = NULL; 00563 rc = client.publish(test_topic, pubmsg); 00564 assert("Good rc from publish", rc == MQTT::SUCCESS, "rc was %d", rc); 00565 00566 /* wait for the message to be received */ 00567 wait_seconds = 10; 00568 while (wait_seconds-- > 0) 00569 { 00570 client.yield(100); 00571 } 00572 00573 rc = (((which == 1 || which == 3) && test1_message_data) || 00574 (which == 2 && test1_message_data == NULL)) ? MQTT::SUCCESS : MQTT::FAILURE; 00575 assert("test1 subscription", rc == MQTT::SUCCESS, "test1_message_data %p\n", 00576 test1_message_data); 00577 rc = (((which == 2 || which == 3) && test2_message_data) || 00578 (which == 1 && test2_message_data == NULL)) ? MQTT::SUCCESS : MQTT::FAILURE; 00579 assert("test2 subscription", rc == MQTT::SUCCESS, "test2_message_data %p\n", 00580 test2_message_data); 00581 return rc; 00582 } 00583 00584 00585 int test3(struct Options options) 00586 { 00587 MQTT::QoS subsqos = MQTT::QOS2; 00588 int rc; 00589 const char* test_topic = "C client test3"; 00590 int wait_seconds = 0; 00591 00592 fprintf(xml, "<testcase classname=\"test3\" name=\"session state\""); 00593 global_start_time = start_clock(); 00594 failures = 0; 00595 MyLog(LOGA_INFO, "Starting test 3 - session state"); 00596 00597 IPStack ipstack = IPStack(); 00598 MQTT::Client<IPStack, Countdown, 1000> client = MQTT::Client<IPStack, Countdown, 1000>(ipstack); 00599 00600 MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 00601 data.willFlag = 1; 00602 data.MQTTVersion = options.MQTTVersion; 00603 data.clientID.cstring = (char*)"connack-return-data"; 00604 data.username.cstring = (char*)"testuser"; 00605 data.password.cstring = (char*)"testpassword"; 00606 00607 data.keepAliveInterval = 10; 00608 data.cleansession = 1; 00609 00610 data.will.message.cstring = (char*)"will message"; 00611 data.will.qos = 1; 00612 data.will.retained = 0; 00613 data.will.topicName.cstring = (char*)"will topic"; 00614 00615 assert("Good rc in connack", client.isConnected() == false, 00616 "isconnected was %d", client.isConnected()); 00617 00618 MyLog(LOGA_DEBUG, "Connecting"); 00619 rc = ipstack.connect(options.host, options.port); 00620 assert("Good rc from TCP connect", rc == MQTT::SUCCESS, "rc was %d", rc); 00621 if (rc != MQTT::SUCCESS) 00622 goto exit; 00623 00624 MQTT::connackData connack; 00625 rc = client.connect(data, connack); 00626 assert("Good rc from connect", rc == MQTT::SUCCESS, "rc was %d", rc); 00627 if (rc != MQTT::SUCCESS) 00628 goto exit; 00629 00630 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00631 assert("Session present is 0", connack.sessionPresent == 0, 00632 "sessionPresent was %d", connack.sessionPresent); 00633 00634 assert("Good rc in connack", client.isConnected() == true, 00635 "isconnected was %d", client.isConnected()); 00636 00637 rc = client.disconnect(); 00638 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00639 ipstack.disconnect(); 00640 00641 /* reconnect with cleansession false */ 00642 data.cleansession = 0; 00643 rc = ipstack.connect(options.proxy_host, options.proxy_port); 00644 assert("TCP connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00645 rc = client.connect(data, connack); 00646 assert("Connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00647 00648 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00649 assert("Session present is 0", connack.sessionPresent == 0, 00650 "sessionPresent was %d", connack.sessionPresent); 00651 00652 MQTT::subackData suback; 00653 rc = client.subscribe(test_topic, subsqos, messageArrived, suback); 00654 assert("Good rc from subscribe", rc == MQTT::SUCCESS, "rc was %d", rc); 00655 assert("Granted QoS rc from subscribe", suback.grantedQoS == MQTT::QOS2, 00656 "rc was %d", suback.grantedQoS); 00657 00658 check_subs_exist(client, test_topic, 1); 00659 00660 rc = client.subscribe(test_topic, subsqos, messageArrived2, suback); 00661 assert("Good rc from subscribe", rc == MQTT::SUCCESS, "rc was %d", rc); 00662 assert("Granted QoS rc from subscribe", suback.grantedQoS == MQTT::QOS2, 00663 "rc was %d", suback.grantedQoS); 00664 00665 check_subs_exist(client, test_topic, 2); 00666 00667 rc = client.disconnect(); 00668 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00669 ipstack.disconnect(); 00670 00671 /* reconnect with cleansession false */ 00672 data.cleansession = 0; 00673 rc = ipstack.connect(options.proxy_host, options.proxy_port); 00674 assert("TCP connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00675 rc = client.connect(data, connack); 00676 assert("Connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00677 00678 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00679 assert("Session present is 1", connack.sessionPresent == 1, 00680 "sessionPresent was %d", connack.sessionPresent); 00681 00682 check_subs_exist(client, test_topic, 2); 00683 00684 rc = client.subscribe(test_topic, subsqos, messageArrived, suback); 00685 assert("Good rc from subscribe", rc == MQTT::SUCCESS, "rc was %d", rc); 00686 assert("Granted QoS rc from subscribe", suback.grantedQoS == MQTT::QOS2, 00687 "rc was %d", suback.grantedQoS); 00688 00689 check_subs_exist(client, test_topic, 1); 00690 00691 // cause a connection FAILURE 00692 memset(&pubmsg, '\0', sizeof(pubmsg)); 00693 pubmsg.payload = (void*)"TERMINATE"; 00694 pubmsg.payloadlen = strlen((char*)pubmsg.payload); 00695 pubmsg.qos = MQTT::QOS0; 00696 pubmsg.retained = false; 00697 pubmsg.dup = false; 00698 rc = client.publish("MQTTSAS topic", pubmsg); 00699 assert("Good rc from publish", rc == MQTT::SUCCESS, "rc was %d", rc); 00700 00701 // wait for failure to be noticed by keepalive 00702 wait_seconds = 20; 00703 while (client.isConnected() && (wait_seconds-- > 0)) 00704 { 00705 client.yield(1000); 00706 } 00707 assert("Disconnected", !client.isConnected(), "isConnected was %d", 00708 client.isConnected()); 00709 ipstack.disconnect(); 00710 00711 /* reconnect with cleansession false */ 00712 data.cleansession = 0; 00713 rc = ipstack.connect(options.host, options.port); 00714 assert("TCP connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00715 rc = client.connect(data, connack); 00716 assert("Connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00717 00718 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00719 assert("Session present is 1", connack.sessionPresent == 1, 00720 "sessionPresent was %d", connack.sessionPresent); 00721 00722 check_subs_exist(client, test_topic, 1); 00723 00724 rc = client.subscribe(test_topic, subsqos, messageArrived2, suback); 00725 assert("Good rc from subscribe", rc == MQTT::SUCCESS, "rc was %d", rc); 00726 assert("Granted QoS rc from subscribe", suback.grantedQoS == MQTT::QOS2, 00727 "rc was %d", suback.grantedQoS); 00728 00729 check_subs_exist(client, test_topic, 2); 00730 00731 rc = client.disconnect(); 00732 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00733 ipstack.disconnect(); 00734 00735 /* reconnect with cleansession true to clean up both server and client state */ 00736 data.cleansession = 1; 00737 rc = ipstack.connect(options.host, options.port); 00738 assert("TCP connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00739 rc = client.connect(data, connack); 00740 assert("Connect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00741 00742 assert("Good rc in connack", connack.rc == 0, "rc was %d", connack.rc); 00743 assert("Session present is 0", connack.sessionPresent == 0, 00744 "sessionPresent was %d", connack.sessionPresent); 00745 00746 rc = client.subscribe(test_topic, subsqos, messageArrived2, suback); 00747 assert("Good rc from subscribe", rc == MQTT::SUCCESS, "rc was %d", rc); 00748 assert("Granted QoS rc from subscribe", suback.grantedQoS == MQTT::QOS2, 00749 "rc was %d", suback.grantedQoS); 00750 00751 check_subs_exist(client, test_topic, 2); 00752 00753 rc = client.disconnect(); 00754 assert("Disconnect successful", rc == MQTT::SUCCESS, "rc was %d", rc); 00755 ipstack.disconnect(); 00756 00757 exit: 00758 MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.", 00759 (failures == 0) ? "passed" : "failed", tests, failures); 00760 write_test_result(); 00761 return failures; 00762 } 00763 00764 00765 #if 0 00766 /********************************************************************* 00767 00768 Test 4: connectionLost and will message 00769 00770 *********************************************************************/ 00771 MQTTClient test6_c1, test6_c2; 00772 volatile int test6_will_message_arrived = 0; 00773 volatile int test6_connection_lost_called = 0; 00774 00775 void test6_connectionLost(void* context, char* cause) 00776 { 00777 MQTTClient c = (MQTTClient)context; 00778 printf("%s -> Callback: connection lost\n", (c == test6_c1) ? "Client-1" : "Client-2"); 00779 test6_connection_lost_called = 1; 00780 } 00781 00782 void test6_deliveryComplete(void* context, MQTTClient_deliveryToken token) 00783 { 00784 printf("Client-2 -> Callback: publish complete for token %d\n", token); 00785 } 00786 00787 char* test6_will_topic = "C Test 2: will topic"; 00788 char* test6_will_message = "will message from Client-1"; 00789 00790 int test6_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m) 00791 { 00792 MQTTClient c = (MQTTClient)context; 00793 printf("%s -> Callback: message received on topic '%s' is '%.*s'.\n", 00794 (c == test6_c1) ? "Client-1" : "Client-2", topicName, m->payloadlen, (char*)(m->payload)); 00795 if (c == test6_c2 && strcmp(topicName, test6_will_topic) == 0 && memcmp(m->payload, test6_will_message, m->payloadlen) == 0) 00796 test6_will_message_arrived = 1; 00797 MQTTClient_free(topicName); 00798 MQTTClient_freeMessage(&m); 00799 return 1; 00800 } 00801 00802 00803 int test6(struct Options options) 00804 { 00805 char* testname = "test6"; 00806 MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer; 00807 MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer; 00808 MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer; 00809 int rc, count; 00810 char* mqttsas_topic = "MQTTSAS topic"; 00811 00812 failures = 0; 00813 MyLog(LOGA_INFO, "Starting test 6 - connectionLost and will messages"); 00814 fprintf(xml, "<testcase classname=\"test1\" name=\"connectionLost and will messages\""); 00815 global_start_time = start_clock(); 00816 00817 opts.keepAliveInterval = 2; 00818 opts.cleansession = 1; 00819 opts.MQTTVersion = MQTTVERSION_3_1_1; 00820 opts.will = &wopts; 00821 opts.will->message = test6_will_message; 00822 opts.will->qos = 1; 00823 opts.will->retained = 0; 00824 opts.will->topicName = test6_will_topic; 00825 if (options.haconnections != NULL) 00826 { 00827 opts.serverURIs = options.haconnections; 00828 opts.serverURIcount = options.hacount; 00829 } 00830 00831 /* Client-1 with Will options */ 00832 rc = MQTTClient_create(&test6_c1, options.proxy_connection, "Client_1", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL); 00833 assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc); 00834 if (rc != MQTTCLIENT_SUCCESS) 00835 goto exit; 00836 00837 rc = MQTTClient_setCallbacks(test6_c1, (void*)test6_c1, test6_connectionLost, test6_messageArrived, test6_deliveryComplete); 00838 assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc); 00839 if (rc != MQTTCLIENT_SUCCESS) 00840 goto exit; 00841 00842 /* Connect to the broker */ 00843 rc = MQTTClient_connect(test6_c1, &opts); 00844 assert("good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc); 00845 if (rc != MQTTCLIENT_SUCCESS) 00846 goto exit; 00847 00848 /* Client - 2 (multi-threaded) */ 00849 rc = MQTTClient_create(&test6_c2, options.connection, "Client_2", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL); 00850 assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc); 00851 00852 /* Set the callback functions for the client */ 00853 rc = MQTTClient_setCallbacks(test6_c2, (void*)test6_c2, test6_connectionLost, test6_messageArrived, test6_deliveryComplete); 00854 assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc); 00855 00856 /* Connect to the broker */ 00857 opts2.keepAliveInterval = 20; 00858 opts2.cleansession = 1; 00859 MyLog(LOGA_INFO, "Connecting Client_2 ..."); 00860 rc = MQTTClient_connect(test6_c2, &opts2); 00861 assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc); 00862 00863 rc = MQTTClient_subscribe(test6_c2, test6_will_topic, 2); 00864 assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc); 00865 00866 /* now send the command which will break the connection and cause the will message to be sent */ 00867 rc = MQTTClient_publish(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL); 00868 assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc); 00869 00870 MyLog(LOGA_INFO, "Waiting to receive the will message"); 00871 count = 0; 00872 while (++count < 40) 00873 { 00874 #if defined(WIN32) 00875 Sleep(1000L); 00876 #else 00877 sleep(1); 00878 #endif 00879 if (test6_will_message_arrived == 1 && test6_connection_lost_called == 1) 00880 break; 00881 } 00882 assert("will message arrived", test6_will_message_arrived == 1, 00883 "will_message_arrived was %d\n", test6_will_message_arrived); 00884 assert("connection lost called", test6_connection_lost_called == 1, 00885 "connection_lost_called %d\n", test6_connection_lost_called); 00886 00887 rc = MQTTClient_unsubscribe(test6_c2, test6_will_topic); 00888 assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); 00889 00890 rc = MQTTClient_isConnected(test6_c2); 00891 assert("Client-2 still connected", rc == 1, "isconnected is %d", rc); 00892 00893 rc = MQTTClient_isConnected(test6_c1); 00894 assert("Client-1 not connected", rc == 0, "isconnected is %d", rc); 00895 00896 rc = MQTTClient_disconnect(test6_c2, 100L); 00897 assert("Good rc from disconnect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); 00898 00899 MQTTClient_destroy(&test6_c1); 00900 MQTTClient_destroy(&test6_c2); 00901 00902 exit: 00903 MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.\n", 00904 (failures == 0) ? "passed" : "failed", testname, tests, failures); 00905 write_test_result(); 00906 return failures; 00907 } 00908 #endif 00909 00910 int main(int argc, char** argv) 00911 { 00912 int rc = 0; 00913 int (*tests[])(Options) = {NULL, test1, test2, test3, /*test4, test5, test6, test6a*/}; 00914 int i; 00915 00916 xml = fopen("TEST-test1.xml", "w"); 00917 fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1)); 00918 00919 //setenv("MQTT_C_CLIENT_TRACE", "ON", 1); 00920 //setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 0); 00921 00922 getopts(argc, argv); 00923 00924 for (i = 0; i < options.iterations; ++i) 00925 { 00926 if (options.test_no == 0) 00927 { /* run all the tests */ 00928 for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no) 00929 rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */ 00930 } 00931 else 00932 rc = tests[options.test_no](options); /* run just the selected test */ 00933 } 00934 00935 if (rc == 0) 00936 MyLog(LOGA_INFO, "verdict pass"); 00937 else 00938 MyLog(LOGA_INFO, "verdict fail"); 00939 00940 fprintf(xml, "</testsuite>\n"); 00941 fclose(xml); 00942 return rc; 00943 }
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2