Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers pub0sub1_nb.c Source File

pub0sub1_nb.c

00001 /*******************************************************************************
00002  * Copyright (c) 2014 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Sergio R. Caprile - clarifications and/or documentation extension
00016  *******************************************************************************/
00017 
00018 #include <stdio.h>
00019 #include <string.h>
00020 #include <stdlib.h>
00021 
00022 #include "MQTTPacket.h"
00023 #include "transport.h"
00024 
00025 /* This is in order to get an asynchronous signal to stop the sample,
00026 as the code loops waiting for msgs on the subscribed topic.
00027 Your actual code will depend on your hw and approach*/
00028 #include <signal.h>
00029 
00030 int toStop = 0;
00031 
00032 void cfinish(int sig)
00033 {
00034     signal(SIGINT, NULL);
00035     toStop = 1;
00036 }
00037 
00038 void stop_init(void)
00039 {
00040     signal(SIGINT, cfinish);
00041     signal(SIGTERM, cfinish);
00042 }
00043 /* */
00044 
00045 int main(int argc, char *argv[])
00046 {
00047     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
00048     int rc = 0;
00049     int mysock = 0;
00050     unsigned char buf[200];
00051     int buflen = sizeof(buf);
00052     int msgid = 1;
00053     MQTTString topicString = MQTTString_initializer;
00054     int req_qos = 0;
00055     char* payload = "mypayload";
00056     int payloadlen = strlen(payload);
00057     int len = 0;
00058     char *host = "m2m.eclipse.org";
00059     int port = 1883;
00060     MQTTTransport mytransport;
00061 
00062     stop_init();
00063     if (argc > 1)
00064         host = argv[1];
00065 
00066     if (argc > 2)
00067         port = atoi(argv[2]);
00068 
00069     mysock = transport_open(host, port);
00070     if(mysock < 0)
00071         return mysock;
00072 
00073     printf("Sending to hostname %s port %d\n", host, port);
00074 
00075     mytransport.sck = &mysock;
00076     mytransport.getfn = transport_getdatanb;
00077     mytransport.state = 0;
00078     data.clientID.cstring = "me";
00079     data.keepAliveInterval = 20;
00080     data.cleansession = 1;
00081     data.username.cstring = "testuser";
00082     data.password.cstring = "testpassword";
00083 
00084     len = MQTTSerialize_connect(buf, buflen, &data);
00085     rc = transport_sendPacketBuffer(mysock, buf, len);
00086 
00087     /* wait for connack */
00088     if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
00089     {
00090         unsigned char sessionPresent, connack_rc;
00091 
00092         if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
00093         {
00094             printf("Unable to connect, return code %d\n", connack_rc);
00095             goto exit;
00096         }
00097     }
00098     else
00099         goto exit;
00100 
00101     /* subscribe */
00102     topicString.cstring = "substopic";
00103     len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
00104 
00105     rc = transport_sendPacketBuffer(mysock, buf, len);
00106     do {
00107         int frc;
00108         if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == SUBACK) /* wait for suback */
00109         {
00110             unsigned short submsgid;
00111             int subcount;
00112             int granted_qos;
00113 
00114             rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
00115             if (granted_qos != 0)
00116             {
00117                 printf("granted qos != 0, %d\n", granted_qos);
00118                 goto exit;
00119             }
00120             break;
00121         }
00122         else if (frc == -1)
00123             goto exit;
00124     } while (1); /* handle timeouts here */
00125     /* loop getting msgs on subscribed topic */
00126     topicString.cstring = "pubtopic";
00127     while (!toStop)
00128     {
00129         /* handle timeouts */
00130         if (MQTTPacket_readnb(buf, buflen, &mytransport) == PUBLISH)
00131         {
00132             unsigned char dup;
00133             int qos;
00134             unsigned char retained;
00135             unsigned short msgid;
00136             int payloadlen_in;
00137             unsigned char* payload_in;
00138             int rc;
00139             MQTTString receivedTopic;
00140 
00141             rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
00142                     &payload_in, &payloadlen_in, buf, buflen);
00143             printf("message arrived %.*s\n", payloadlen_in, payload_in);
00144             printf("publishing reading\n");
00145             len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
00146             rc = transport_sendPacketBuffer(mysock, buf, len);
00147         }
00148     }
00149 
00150     printf("disconnecting\n");
00151     len = MQTTSerialize_disconnect(buf, buflen);
00152     rc = transport_sendPacketBuffer(mysock, buf, len);
00153 
00154 exit:
00155     transport_close(mysock);
00156 
00157     return 0;
00158 }