Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
pub0sub1_nb.c
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Sergio R. Caprile - port to the bare metal environment 00016 *******************************************************************************/ 00017 00018 #include <stdio.h> 00019 #include <string.h> 00020 #include <stdlib.h> 00021 00022 #include "MQTTPacket.h" 00023 #include "transport.h" 00024 00025 /* This is in order to get an asynchronous signal to stop the sample, 00026 as the code loops waiting for msgs on the subscribed topic. 00027 Your actual code will depend on your hw and approach, but this sample can be 00028 run on Linux so debugging of the non-hardware specific bare metal code is easier. 00029 See at bottom of file for details */ 00030 #include <signal.h> 00031 00032 int toStop = 0; 00033 00034 void stop_init(void); 00035 /* */ 00036 00037 /* Same as above, we provide a set of functions to test/debug on a friendlier system; 00038 the init() and close() actions on the serial are just for this, you will probably 00039 handle this on whatever handles your media in your application */ 00040 void sampleserial_init(void); 00041 void sampleserial_close(void); 00042 int samplesend(unsigned char *address, unsigned int bytes); 00043 int samplerecv(unsigned char *address, unsigned int maxbytes); 00044 /* */ 00045 00046 /* You will use your hardware specifics here, see transport.h. */ 00047 static transport_iofunctions_t iof = {samplesend, samplerecv}; 00048 00049 enum states { READING, PUBLISHING }; 00050 00051 int main(int argc, char *argv[]) 00052 { 00053 MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 00054 int rc = 0; 00055 int mysock = 0; 00056 unsigned char buf[200]; 00057 int buflen = sizeof(buf); 00058 int msgid = 1; 00059 MQTTString topicString = MQTTString_initializer; 00060 int req_qos = 0; 00061 char* payload = "mypayload"; 00062 int payloadlen = strlen(payload); 00063 int len = 0; 00064 MQTTTransport mytransport; 00065 int state = READING; 00066 00067 stop_init(); 00068 sampleserial_init(); 00069 00070 mysock = transport_open(&iof); 00071 if(mysock < 0) 00072 return mysock; 00073 /* You will (or already are) 'somehow' connect(ed) to host:port via your hardware specifics. E.g.: 00074 you have a serial (RS-232/UART) link 00075 you have a cell modem and you issue your AT+ magic 00076 you have some TCP/IP which is not lwIP (nor a full-fledged socket compliant one) 00077 and you TCP connect 00078 */ 00079 00080 mytransport.sck = &mysock; 00081 mytransport.getfn = transport_getdatanb; 00082 mytransport.state = 0; 00083 data.clientID.cstring = "me"; 00084 data.keepAliveInterval = 20; 00085 data.cleansession = 1; 00086 data.username.cstring = "testuser"; 00087 data.password.cstring = "testpassword"; 00088 00089 len = MQTTSerialize_connect(buf, buflen, &data); 00090 /* This one blocks until it finishes sending, you will probably not want this in real life, 00091 in such a case replace this call by a scheme similar to the one you'll see in the main loop */ 00092 rc = transport_sendPacketBuffer(mysock, buf, len); 00093 00094 printf("Sent MQTT connect\n"); 00095 /* wait for connack */ 00096 do { 00097 int frc; 00098 if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == CONNACK){ 00099 unsigned char sessionPresent, connack_rc; 00100 if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0){ 00101 printf("Unable to connect, return code %d\n", connack_rc); 00102 goto exit; 00103 } 00104 break; 00105 } 00106 else if (frc == -1) 00107 goto exit; 00108 } while (1); /* handle timeouts here */ 00109 00110 printf("MQTT connected\n"); 00111 /* subscribe */ 00112 topicString.cstring = "substopic"; 00113 len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos); 00114 00115 /* This is equivalent to the one above, but using the non-blocking functions. You will probably not want this in real life, 00116 in such a case replace this call by a scheme similar to the one you'll see in the main loop */ 00117 transport_sendPacketBuffernb_start(mysock, buf, len); 00118 while((rc=transport_sendPacketBuffernb(mysock)) != TRANSPORT_DONE); 00119 do { 00120 int frc; 00121 if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == SUBACK){ /* wait for suback */ 00122 unsigned short submsgid; 00123 int subcount; 00124 int granted_qos; 00125 00126 rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen); 00127 if (granted_qos != 0){ 00128 printf("granted qos != 0, %d\n", granted_qos); 00129 goto exit; 00130 } 00131 break; 00132 } 00133 else if (frc == -1) 00134 goto exit; 00135 } while (1); /* handle timeouts here */ 00136 printf("Subscribed\n"); 00137 00138 /* loop getting msgs on subscribed topic */ 00139 topicString.cstring = "pubtopic"; 00140 state = READING; 00141 while (!toStop) { 00142 /* do other stuff here */ 00143 switch(state){ 00144 case READING: 00145 if((rc=MQTTPacket_readnb(buf, buflen, &mytransport)) == PUBLISH){ 00146 unsigned char dup; 00147 int qos; 00148 unsigned char retained; 00149 unsigned short msgid; 00150 int payloadlen_in; 00151 unsigned char* payload_in; 00152 int rc; 00153 MQTTString receivedTopic; 00154 00155 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, 00156 &payload_in, &payloadlen_in, buf, buflen); 00157 printf("message arrived %.*s\n", payloadlen_in, payload_in); 00158 printf("publishing reading\n"); 00159 state = PUBLISHING; 00160 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen); 00161 transport_sendPacketBuffernb_start(mysock, buf, len); 00162 } else if(rc == -1){ 00163 /* handle I/O errors here */ 00164 goto exit; 00165 } /* handle timeouts here */ 00166 break; 00167 case PUBLISHING: 00168 switch(transport_sendPacketBuffernb(mysock)){ 00169 case TRANSPORT_DONE: 00170 printf("Published\n"); 00171 state = READING; 00172 break; 00173 case TRANSPORT_ERROR: 00174 /* handle any I/O errors here */ 00175 goto exit; 00176 break; 00177 case TRANSPORT_AGAIN: 00178 default: 00179 /* handle timeouts here, not probable unless there is a hardware problem */ 00180 break; 00181 } 00182 break; 00183 } 00184 } 00185 00186 printf("disconnecting\n"); 00187 len = MQTTSerialize_disconnect(buf, buflen); 00188 /* Same blocking related stuff here */ 00189 rc = transport_sendPacketBuffer(mysock, buf, len); 00190 00191 exit: 00192 transport_close(mysock); 00193 00194 sampleserial_close(); 00195 return 0; 00196 } 00197 00198 00199 /* To stop the sample */ 00200 void cfinish(int sig) 00201 { 00202 signal(SIGINT, NULL); 00203 toStop = 1; 00204 } 00205 00206 void stop_init(void) 00207 { 00208 signal(SIGINT, cfinish); 00209 signal(SIGTERM, cfinish); 00210 } 00211 00212 /* Serial hack: 00213 Simulate serial transfers on an established TCP connection 00214 */ 00215 #include <unistd.h> 00216 #include <errno.h> 00217 #include <sys/types.h> 00218 #include <sys/socket.h> 00219 #include <netinet/in.h> 00220 #include <arpa/inet.h> 00221 #include <fcntl.h> 00222 00223 static int sockfd; 00224 00225 void sampleserial_init(void) 00226 { 00227 struct sockaddr_in serv_addr; 00228 00229 00230 if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 00231 perror(NULL); 00232 exit(2); 00233 } 00234 serv_addr.sin_family = AF_INET; 00235 serv_addr.sin_addr.s_addr = inet_addr("198.41.30.241"); 00236 serv_addr.sin_port = htons(1883); 00237 if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { 00238 printf("ERROR connecting\n"); 00239 exit(-1); 00240 } 00241 printf("- TCP Connected to Eclipse\n"); 00242 /* set to non-blocking */ 00243 fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK); 00244 } 00245 00246 void sampleserial_close(void) 00247 { 00248 close(sockfd); 00249 } 00250 00251 int samplesend(unsigned char *address, unsigned int bytes) 00252 { 00253 int len; 00254 00255 if(rand() > (RAND_MAX/2)) // 50% probability of being busy 00256 return 0; 00257 if(rand() > (RAND_MAX/2)){ // 50% probability of sending half the requested data (no room in buffer) 00258 if(bytes > 1) 00259 bytes /= 2; 00260 } 00261 if((len = write(sockfd, address, bytes)) >= 0) 00262 return len; 00263 if(errno == EAGAIN) 00264 return 0; 00265 return -1; 00266 } 00267 00268 int samplerecv(unsigned char *address, unsigned int maxbytes) 00269 { 00270 int len; 00271 00272 if(rand() > (RAND_MAX/2)) // 50% probability of no data 00273 return 0; 00274 if(rand() > (RAND_MAX/2)){ // 50% probability of getting half the requested data (not arrived yet) 00275 if(maxbytes > 1){ 00276 maxbytes /= 2; 00277 } 00278 } 00279 if((len = read(sockfd, address, maxbytes)) >= 0) 00280 return len; 00281 if(errno == EAGAIN) 00282 return 0; 00283 return -1; 00284 } 00285
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2