Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers pub0sub1_nb.c Source File

pub0sub1_nb.c

00001 /*******************************************************************************
00002  * Copyright (c) 2014 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Sergio R. Caprile - port to the bare metal environment
00016  *******************************************************************************/
00017 
00018 #include <stdio.h>
00019 #include <string.h>
00020 #include <stdlib.h>
00021 
00022 #include "MQTTPacket.h"
00023 #include "transport.h"
00024 
00025 /* This is in order to get an asynchronous signal to stop the sample,
00026 as the code loops waiting for msgs on the subscribed topic.
00027 Your actual code will depend on your hw and approach, but this sample can be
00028 run on Linux so debugging of the non-hardware specific bare metal code is easier.
00029 See at bottom of file for details */
00030 #include <signal.h>
00031 
00032 int toStop = 0;
00033 
00034 void stop_init(void);
00035 /* */
00036 
00037 /* Same as above, we provide a set of functions to test/debug on a friendlier system;
00038 the init() and  close() actions on the serial are just for this, you will probably
00039 handle this on whatever handles your media in your application */
00040 void sampleserial_init(void);
00041 void sampleserial_close(void);
00042 int samplesend(unsigned char *address, unsigned int bytes);
00043 int samplerecv(unsigned char *address, unsigned int maxbytes);
00044 /* */
00045 
00046 /* You will use your hardware specifics here, see transport.h. */
00047 static transport_iofunctions_t iof = {samplesend, samplerecv};
00048 
00049 enum states { READING, PUBLISHING };
00050 
00051 int main(int argc, char *argv[])
00052 {
00053     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
00054     int rc = 0;
00055     int mysock = 0;
00056     unsigned char buf[200];
00057     int buflen = sizeof(buf);
00058     int msgid = 1;
00059     MQTTString topicString = MQTTString_initializer;
00060     int req_qos = 0;
00061     char* payload = "mypayload";
00062     int payloadlen = strlen(payload);
00063     int len = 0;
00064     MQTTTransport mytransport;
00065     int state = READING;
00066 
00067     stop_init();
00068     sampleserial_init();
00069 
00070     mysock = transport_open(&iof);
00071     if(mysock < 0)
00072         return mysock;
00073     /* You will (or already are) 'somehow' connect(ed) to host:port via your hardware specifics. E.g.:
00074         you have a serial (RS-232/UART) link
00075         you have a cell modem and you issue your AT+ magic
00076         you have some TCP/IP which is not lwIP (nor a full-fledged socket compliant one)
00077          and you TCP connect
00078     */
00079 
00080     mytransport.sck = &mysock;
00081     mytransport.getfn = transport_getdatanb;
00082     mytransport.state = 0;
00083     data.clientID.cstring = "me";
00084     data.keepAliveInterval = 20;
00085     data.cleansession = 1;
00086     data.username.cstring = "testuser";
00087     data.password.cstring = "testpassword";
00088 
00089     len = MQTTSerialize_connect(buf, buflen, &data);
00090     /* This one blocks until it finishes sending, you will probably not want this in real life,
00091     in such a case replace this call by a scheme similar to the one you'll see in the main loop */
00092     rc = transport_sendPacketBuffer(mysock, buf, len);
00093 
00094     printf("Sent MQTT connect\n");
00095     /* wait for connack */
00096     do {
00097         int frc;
00098         if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == CONNACK){
00099             unsigned char sessionPresent, connack_rc;
00100             if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0){
00101                 printf("Unable to connect, return code %d\n", connack_rc);
00102                 goto exit;
00103             }
00104             break;
00105         }
00106         else if (frc == -1)
00107             goto exit;
00108     } while (1); /* handle timeouts here */
00109 
00110     printf("MQTT connected\n");
00111     /* subscribe */
00112     topicString.cstring = "substopic";
00113     len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
00114 
00115     /* This is equivalent to the one above, but using the non-blocking functions. You will probably not want this in real life,
00116     in such a case replace this call by a scheme similar to the one you'll see in the main loop */
00117     transport_sendPacketBuffernb_start(mysock, buf, len);
00118     while((rc=transport_sendPacketBuffernb(mysock)) != TRANSPORT_DONE);
00119     do {
00120         int frc;
00121         if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == SUBACK){ /* wait for suback */
00122             unsigned short submsgid;
00123             int subcount;
00124             int granted_qos;
00125 
00126             rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
00127             if (granted_qos != 0){
00128                 printf("granted qos != 0, %d\n", granted_qos);
00129                 goto exit;
00130             }
00131             break;
00132         }
00133         else if (frc == -1)
00134             goto exit;
00135     } while (1); /* handle timeouts here */
00136     printf("Subscribed\n");
00137 
00138     /* loop getting msgs on subscribed topic */
00139     topicString.cstring = "pubtopic";
00140     state = READING;
00141     while (!toStop) {
00142         /* do other stuff here */
00143         switch(state){
00144         case READING:
00145             if((rc=MQTTPacket_readnb(buf, buflen, &mytransport)) == PUBLISH){
00146                 unsigned char dup;
00147                 int qos;
00148                 unsigned char retained;
00149                 unsigned short msgid;
00150                 int payloadlen_in;
00151                 unsigned char* payload_in;
00152                 int rc;
00153                 MQTTString receivedTopic;
00154     
00155                 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
00156                         &payload_in, &payloadlen_in, buf, buflen);
00157                 printf("message arrived %.*s\n", payloadlen_in, payload_in);
00158                 printf("publishing reading\n");
00159                 state = PUBLISHING;
00160                 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
00161                 transport_sendPacketBuffernb_start(mysock, buf, len);
00162             } else if(rc == -1){
00163                 /* handle I/O errors here */
00164                 goto exit;
00165             }   /* handle timeouts here */
00166             break;
00167         case PUBLISHING:
00168             switch(transport_sendPacketBuffernb(mysock)){
00169             case TRANSPORT_DONE:
00170                 printf("Published\n");
00171                 state = READING;
00172                 break;
00173             case TRANSPORT_ERROR:
00174                 /* handle any I/O errors here */
00175                 goto exit;
00176                 break;
00177             case TRANSPORT_AGAIN:
00178             default:
00179                 /* handle timeouts here, not probable unless there is a hardware problem */
00180                 break;
00181             }
00182             break;
00183         }
00184     }
00185 
00186     printf("disconnecting\n");
00187     len = MQTTSerialize_disconnect(buf, buflen);
00188     /* Same blocking related stuff here */
00189     rc = transport_sendPacketBuffer(mysock, buf, len);
00190 
00191 exit:
00192     transport_close(mysock);
00193     
00194     sampleserial_close();
00195     return 0;
00196 }
00197 
00198 
00199 /* To stop the sample */
00200 void cfinish(int sig)
00201 {
00202     signal(SIGINT, NULL);
00203     toStop = 1;
00204 }
00205 
00206 void stop_init(void)
00207 {
00208     signal(SIGINT, cfinish);
00209     signal(SIGTERM, cfinish);
00210 }
00211 
00212 /* Serial hack:
00213 Simulate serial transfers on an established TCP connection
00214  */
00215 #include <unistd.h>
00216 #include <errno.h>
00217 #include <sys/types.h>
00218 #include <sys/socket.h>
00219 #include <netinet/in.h>
00220 #include <arpa/inet.h> 
00221 #include <fcntl.h>
00222 
00223 static int sockfd;
00224 
00225 void sampleserial_init(void)
00226 {
00227 struct sockaddr_in serv_addr;
00228 
00229 
00230     if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
00231       perror(NULL);
00232       exit(2);
00233     }
00234     serv_addr.sin_family = AF_INET;
00235     serv_addr.sin_addr.s_addr = inet_addr("198.41.30.241");
00236     serv_addr.sin_port = htons(1883);
00237     if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
00238         printf("ERROR connecting\n");
00239         exit(-1);
00240     }
00241     printf("- TCP Connected to Eclipse\n");
00242         /* set to non-blocking */
00243     fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
00244 }
00245 
00246 void sampleserial_close(void)
00247 {
00248     close(sockfd);
00249 }
00250 
00251 int samplesend(unsigned char *address, unsigned int bytes)
00252 {
00253 int len;
00254 
00255     if(rand() > (RAND_MAX/2))   // 50% probability of being busy
00256         return 0;
00257     if(rand() > (RAND_MAX/2)){  // 50% probability of sending half the requested data (no room in buffer)
00258         if(bytes > 1)
00259             bytes /= 2;
00260     }
00261     if((len = write(sockfd, address, bytes)) >= 0)
00262         return len;
00263     if(errno == EAGAIN)
00264         return 0;
00265     return -1;
00266 }
00267 
00268 int samplerecv(unsigned char *address, unsigned int maxbytes)
00269 {
00270 int len;
00271 
00272     if(rand() > (RAND_MAX/2))   // 50% probability of no data
00273         return 0;
00274     if(rand() > (RAND_MAX/2)){  // 50% probability of getting half the requested data (not arrived yet)
00275         if(maxbytes > 1){
00276             maxbytes /= 2;
00277         }
00278     }
00279     if((len = read(sockfd, address, maxbytes)) >= 0)
00280         return len;
00281     if(errno == EAGAIN)
00282         return 0;
00283     return -1;
00284 }
00285