Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
pub0sub1_nb.c
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Sergio R. Caprile - clarifications and/or documentation extension 00016 *******************************************************************************/ 00017 00018 #include <stdio.h> 00019 #include <string.h> 00020 #include <stdlib.h> 00021 00022 #include "MQTTPacket.h" 00023 #include "transport.h" 00024 00025 /* This is in order to get an asynchronous signal to stop the sample, 00026 as the code loops waiting for msgs on the subscribed topic. 00027 Your actual code will depend on your hw and approach*/ 00028 #include <signal.h> 00029 00030 int toStop = 0; 00031 00032 void cfinish(int sig) 00033 { 00034 signal(SIGINT, NULL); 00035 toStop = 1; 00036 } 00037 00038 void stop_init(void) 00039 { 00040 signal(SIGINT, cfinish); 00041 signal(SIGTERM, cfinish); 00042 } 00043 /* */ 00044 00045 int main(int argc, char *argv[]) 00046 { 00047 MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 00048 int rc = 0; 00049 int mysock = 0; 00050 unsigned char buf[200]; 00051 int buflen = sizeof(buf); 00052 int msgid = 1; 00053 MQTTString topicString = MQTTString_initializer; 00054 int req_qos = 0; 00055 char* payload = "mypayload"; 00056 int payloadlen = strlen(payload); 00057 int len = 0; 00058 char *host = "m2m.eclipse.org"; 00059 int port = 1883; 00060 MQTTTransport mytransport; 00061 00062 stop_init(); 00063 if (argc > 1) 00064 host = argv[1]; 00065 00066 if (argc > 2) 00067 port = atoi(argv[2]); 00068 00069 mysock = transport_open(host, port); 00070 if(mysock < 0) 00071 return mysock; 00072 00073 printf("Sending to hostname %s port %d\n", host, port); 00074 00075 mytransport.sck = &mysock; 00076 mytransport.getfn = transport_getdatanb; 00077 mytransport.state = 0; 00078 data.clientID.cstring = "me"; 00079 data.keepAliveInterval = 20; 00080 data.cleansession = 1; 00081 data.username.cstring = "testuser"; 00082 data.password.cstring = "testpassword"; 00083 00084 len = MQTTSerialize_connect(buf, buflen, &data); 00085 rc = transport_sendPacketBuffer(mysock, buf, len); 00086 00087 /* wait for connack */ 00088 if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK) 00089 { 00090 unsigned char sessionPresent, connack_rc; 00091 00092 if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) 00093 { 00094 printf("Unable to connect, return code %d\n", connack_rc); 00095 goto exit; 00096 } 00097 } 00098 else 00099 goto exit; 00100 00101 /* subscribe */ 00102 topicString.cstring = "substopic"; 00103 len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos); 00104 00105 rc = transport_sendPacketBuffer(mysock, buf, len); 00106 do { 00107 int frc; 00108 if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == SUBACK) /* wait for suback */ 00109 { 00110 unsigned short submsgid; 00111 int subcount; 00112 int granted_qos; 00113 00114 rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen); 00115 if (granted_qos != 0) 00116 { 00117 printf("granted qos != 0, %d\n", granted_qos); 00118 goto exit; 00119 } 00120 break; 00121 } 00122 else if (frc == -1) 00123 goto exit; 00124 } while (1); /* handle timeouts here */ 00125 /* loop getting msgs on subscribed topic */ 00126 topicString.cstring = "pubtopic"; 00127 while (!toStop) 00128 { 00129 /* handle timeouts */ 00130 if (MQTTPacket_readnb(buf, buflen, &mytransport) == PUBLISH) 00131 { 00132 unsigned char dup; 00133 int qos; 00134 unsigned char retained; 00135 unsigned short msgid; 00136 int payloadlen_in; 00137 unsigned char* payload_in; 00138 int rc; 00139 MQTTString receivedTopic; 00140 00141 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, 00142 &payload_in, &payloadlen_in, buf, buflen); 00143 printf("message arrived %.*s\n", payloadlen_in, payload_in); 00144 printf("publishing reading\n"); 00145 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen); 00146 rc = transport_sendPacketBuffer(mysock, buf, len); 00147 } 00148 } 00149 00150 printf("disconnecting\n"); 00151 len = MQTTSerialize_disconnect(buf, buflen); 00152 rc = transport_sendPacketBuffer(mysock, buf, len); 00153 00154 exit: 00155 transport_close(mysock); 00156 00157 return 0; 00158 }
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2