Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers ping_nb.c Source File

ping_nb.c

00001 /*******************************************************************************
00002  * Copyright (c) 2014 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Sergio R. Caprile
00015  *******************************************************************************/
00016 
00017 #include <stdio.h>
00018 #include <string.h>
00019 #include <stdlib.h>
00020 
00021 #include "MQTTPacket.h"
00022 #include "transport.h"
00023 
00024 #define KEEPALIVE_INTERVAL 20
00025 
00026 /* This is to get a timebase in seconds to test the sample */
00027 #include <time.h>
00028 time_t old_t;
00029 void start_ping_timer(void)
00030 {
00031     time(&old_t);
00032     old_t += KEEPALIVE_INTERVAL/2 + 1;
00033 }
00034 
00035 int time_to_ping(void)
00036 {
00037 time_t t;
00038 
00039     time(&t);
00040     if(t >= old_t)
00041         return 1;
00042     return 0;
00043 }
00044 
00045 /* This is in order to get an asynchronous signal to stop the sample,
00046 as the code loops waiting for msgs on the subscribed topic.
00047 Your actual code will depend on your hw and approach, but this sample can be
00048 run on Linux so debugging of the non-hardware specific bare metal code is easier.
00049 See at bottom of file for details */
00050 #include <signal.h>
00051 
00052 int toStop = 0;
00053 
00054 void stop_init(void);
00055 /* */
00056 
00057 /* Same as above, we provide a set of functions to test/debug on a friendlier system;
00058 the init() and  close() actions on the serial are just for this, you will probably
00059 handle this on whatever handles your media in your application */
00060 void sampleserial_init(void);
00061 void sampleserial_close(void);
00062 int samplesend(unsigned char *address, unsigned int bytes);
00063 int samplerecv(unsigned char *address, unsigned int maxbytes);
00064 /* */
00065 
00066 /* You will use your hardware specifics here, see transport.h. */
00067 static transport_iofunctions_t iof = {samplesend, samplerecv};
00068 
00069 enum states { IDLE, SENDPING, GETPONG };
00070 
00071 int main(int argc, char *argv[])
00072 {
00073     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
00074     int rc = 0;
00075     int mysock = 0;
00076     unsigned char buf[200];
00077     int buflen = sizeof(buf);
00078     int len = 0;
00079     MQTTTransport mytransport;
00080     int state;
00081 
00082     stop_init();
00083     sampleserial_init();
00084 
00085     mysock = transport_open(&iof);
00086     if(mysock < 0)
00087         return mysock;
00088     /* You will (or already are) 'somehow' connect(ed) to host:port via your hardware specifics. E.g.:
00089         you have a serial (RS-232/UART) link
00090         you have a cell modem and you issue your AT+ magic
00091         you have some TCP/IP which is not lwIP (nor a full-fledged socket compliant one)
00092          and you TCP connect
00093     */
00094 
00095     mytransport.sck = &mysock;
00096     mytransport.getfn = transport_getdatanb;
00097     mytransport.state = 0;
00098     data.clientID.cstring = "me";
00099     data.keepAliveInterval = KEEPALIVE_INTERVAL;
00100     data.cleansession = 1;
00101     data.username.cstring = "testuser";
00102     data.password.cstring = "testpassword";
00103 
00104     len = MQTTSerialize_connect(buf, buflen, &data);
00105     /* This one blocks until it finishes sending, you will probably not want this in real life,
00106     in such a case replace this call by a scheme similar to the one you'll see in the main loop */
00107     rc = transport_sendPacketBuffer(mysock, buf, len);
00108 
00109     printf("Sent MQTT connect\n");
00110     /* wait for connack */
00111     do {
00112         int frc;
00113         if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == CONNACK){
00114             unsigned char sessionPresent, connack_rc;
00115             if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0){
00116                 printf("Unable to connect, return code %d\n", connack_rc);
00117                 goto exit;
00118             }
00119             break;
00120         }
00121         else if (frc == -1)
00122             goto exit;
00123     } while (1); /* handle timeouts here */
00124 
00125     printf("MQTT connected\n");
00126     start_ping_timer();
00127     state = IDLE;
00128     while (!toStop) {
00129         switch(state){
00130         case IDLE:
00131             if(time_to_ping()){
00132                 len = MQTTSerialize_pingreq(buf, buflen);
00133                 transport_sendPacketBuffernb_start(mysock, buf, len);
00134                 state = SENDPING;
00135             }
00136             break;
00137         case SENDPING:
00138             switch(transport_sendPacketBuffernb(mysock)){
00139             case TRANSPORT_DONE:
00140                 printf("Ping...");
00141                 start_ping_timer();
00142                 state = GETPONG;
00143                 break;
00144             case TRANSPORT_ERROR:
00145                 /* handle any I/O errors here */
00146                 goto exit;
00147                 break;
00148             case TRANSPORT_AGAIN:
00149             default:
00150                 /* handle timeouts here, not probable unless there is a hardware problem */
00151                 break;
00152             }
00153             break;
00154         case GETPONG:
00155             if((rc=MQTTPacket_readnb(buf, buflen, &mytransport)) == PINGRESP){
00156                 printf("Pong\n");
00157                 start_ping_timer();
00158                 state = IDLE;
00159             } else if(rc == -1){
00160                 /* handle I/O errors here */
00161                 printf("OOPS\n");
00162                 goto exit;
00163             }   /* handle timeouts here */
00164             break;
00165         }
00166     }
00167 
00168     printf("disconnecting\n");
00169     len = MQTTSerialize_disconnect(buf, buflen);
00170     /* Same blocking related stuff here */
00171     rc = transport_sendPacketBuffer(mysock, buf, len);
00172 
00173 exit:
00174     transport_close(mysock);
00175     
00176     sampleserial_close();
00177     return 0;
00178 }
00179 
00180 
00181 /* To stop the sample */
00182 void cfinish(int sig)
00183 {
00184     signal(SIGINT, NULL);
00185     toStop = 1;
00186 }
00187 
00188 void stop_init(void)
00189 {
00190     signal(SIGINT, cfinish);
00191     signal(SIGTERM, cfinish);
00192 }
00193 
00194 /* Serial hack:
00195 Simulate serial transfers on an established TCP connection
00196  */
00197 #include <unistd.h>
00198 #include <errno.h>
00199 #include <sys/types.h>
00200 #include <sys/socket.h>
00201 #include <netinet/in.h>
00202 #include <arpa/inet.h> 
00203 #include <fcntl.h>
00204 
00205 static int sockfd;
00206 
00207 void sampleserial_init(void)
00208 {
00209 struct sockaddr_in serv_addr;
00210 
00211 
00212     if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
00213       perror(NULL);
00214       exit(2);
00215     }
00216     serv_addr.sin_family = AF_INET;
00217     serv_addr.sin_addr.s_addr = inet_addr("198.41.30.241");
00218     serv_addr.sin_port = htons(1883);
00219     if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
00220         printf("ERROR connecting\n");
00221         exit(-1);
00222     }
00223     printf("- TCP Connected to Eclipse\n");
00224         /* set to non-blocking */
00225     fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
00226 }
00227 
00228 void sampleserial_close(void)
00229 {
00230     close(sockfd);
00231 }
00232 
00233 int samplesend(unsigned char *address, unsigned int bytes)
00234 {
00235 int len;
00236 
00237     if(rand() > (RAND_MAX/2))   // 50% probability of being busy
00238         return 0;
00239     if(rand() > (RAND_MAX/2)){  // 50% probability of sending half the requested data (no room in buffer)
00240         if(bytes > 1)
00241             bytes /= 2;
00242     }
00243     if((len = write(sockfd, address, bytes)) >= 0)
00244         return len;
00245     if(errno == EAGAIN)
00246         return 0;
00247     return -1;
00248 }
00249 
00250 int samplerecv(unsigned char *address, unsigned int maxbytes)
00251 {
00252 int len;
00253 
00254     if(rand() > (RAND_MAX/2))   // 50% probability of no data
00255         return 0;
00256     if(rand() > (RAND_MAX/2)){  // 50% probability of getting half the requested data (not arrived yet)
00257         if(maxbytes > 1){
00258             maxbytes /= 2;
00259         }
00260     }
00261     if((len = read(sockfd, address, maxbytes)) >= 0)
00262         return len;
00263     if(errno == EAGAIN)
00264         return 0;
00265     return -1;
00266 }
00267