Fork of my original MQTTGateway

Dependencies:   mbed-http

Committer:
vpcola
Date:
Sat Apr 08 14:43:14 2017 +0000
Revision:
0:a1734fe1ec4b
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 0:a1734fe1ec4b 1 #include "MQTTPacket.h"
vpcola 0:a1734fe1ec4b 2
vpcola 0:a1734fe1ec4b 3 #include <errno.h>
vpcola 0:a1734fe1ec4b 4 #include <stdlib.h>
vpcola 0:a1734fe1ec4b 5 #include <string.h>
vpcola 0:a1734fe1ec4b 6
vpcola 0:a1734fe1ec4b 7 #include "EthernetInterface.h"
vpcola 0:a1734fe1ec4b 8
vpcola 0:a1734fe1ec4b 9
vpcola 0:a1734fe1ec4b 10 TCPSocketConnection mysock;
vpcola 0:a1734fe1ec4b 11
vpcola 0:a1734fe1ec4b 12 int getdata(char* buf, int count)
vpcola 0:a1734fe1ec4b 13 {
vpcola 0:a1734fe1ec4b 14 return mysock.receive(buf, (size_t)count);
vpcola 0:a1734fe1ec4b 15 }
vpcola 0:a1734fe1ec4b 16
vpcola 0:a1734fe1ec4b 17 int toStop = 0;
vpcola 0:a1734fe1ec4b 18
vpcola 0:a1734fe1ec4b 19
vpcola 0:a1734fe1ec4b 20 int main()
vpcola 0:a1734fe1ec4b 21 {
vpcola 0:a1734fe1ec4b 22 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
vpcola 0:a1734fe1ec4b 23 int rc = 0;
vpcola 0:a1734fe1ec4b 24 char buf[200];
vpcola 0:a1734fe1ec4b 25 int buflen = sizeof(buf);
vpcola 0:a1734fe1ec4b 26 int msgid = 1;
vpcola 0:a1734fe1ec4b 27 MQTTString topicString = MQTTString_initializer;
vpcola 0:a1734fe1ec4b 28 int req_qos = 0;
vpcola 0:a1734fe1ec4b 29 char* payload = "mypayload";
vpcola 0:a1734fe1ec4b 30 int payloadlen = strlen(payload);
vpcola 0:a1734fe1ec4b 31 int len = 0;
vpcola 0:a1734fe1ec4b 32 EthernetInterface eth;
vpcola 0:a1734fe1ec4b 33
vpcola 0:a1734fe1ec4b 34 eth.init(); //Use DHCP
vpcola 0:a1734fe1ec4b 35 eth.connect();
vpcola 0:a1734fe1ec4b 36
vpcola 0:a1734fe1ec4b 37 rc = mysock.connect("m2m.eclipse.org", 1883);
vpcola 0:a1734fe1ec4b 38
vpcola 0:a1734fe1ec4b 39 data.clientID.cstring = "SendReceive mbed MQTT ";
vpcola 0:a1734fe1ec4b 40 data.keepAliveInterval = 20;
vpcola 0:a1734fe1ec4b 41 data.cleansession = 1;
vpcola 0:a1734fe1ec4b 42
vpcola 0:a1734fe1ec4b 43 mysock.set_blocking(true, 1000); /* 1 second Timeout */
vpcola 0:a1734fe1ec4b 44
vpcola 0:a1734fe1ec4b 45 len = MQTTSerialize_connect(buf, buflen, &data);
vpcola 0:a1734fe1ec4b 46 rc = mysock.send(buf, len);
vpcola 0:a1734fe1ec4b 47
vpcola 0:a1734fe1ec4b 48 /* wait for connack */
vpcola 0:a1734fe1ec4b 49 if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
vpcola 0:a1734fe1ec4b 50 {
vpcola 0:a1734fe1ec4b 51 int connack_rc;
vpcola 0:a1734fe1ec4b 52
vpcola 0:a1734fe1ec4b 53 if (MQTTDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0)
vpcola 0:a1734fe1ec4b 54 {
vpcola 0:a1734fe1ec4b 55 printf("Unable to connect, return code %d\n", connack_rc);
vpcola 0:a1734fe1ec4b 56 goto exit;
vpcola 0:a1734fe1ec4b 57 }
vpcola 0:a1734fe1ec4b 58 }
vpcola 0:a1734fe1ec4b 59 else
vpcola 0:a1734fe1ec4b 60 goto exit;
vpcola 0:a1734fe1ec4b 61
vpcola 0:a1734fe1ec4b 62 /* subscribe */
vpcola 0:a1734fe1ec4b 63 topicString.cstring = "substopic";
vpcola 0:a1734fe1ec4b 64 len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
vpcola 0:a1734fe1ec4b 65
vpcola 0:a1734fe1ec4b 66 rc = mysock.send(buf, len);
vpcola 0:a1734fe1ec4b 67 if (MQTTPacket_read(buf, buflen, getdata) == SUBACK) /* wait for suback */
vpcola 0:a1734fe1ec4b 68 {
vpcola 0:a1734fe1ec4b 69 int submsgid;
vpcola 0:a1734fe1ec4b 70 int subcount;
vpcola 0:a1734fe1ec4b 71 int granted_qos;
vpcola 0:a1734fe1ec4b 72
vpcola 0:a1734fe1ec4b 73 rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
vpcola 0:a1734fe1ec4b 74 if (granted_qos != 0)
vpcola 0:a1734fe1ec4b 75 {
vpcola 0:a1734fe1ec4b 76 printf("granted qos != 0, %d\n", granted_qos);
vpcola 0:a1734fe1ec4b 77 goto exit;
vpcola 0:a1734fe1ec4b 78 }
vpcola 0:a1734fe1ec4b 79 }
vpcola 0:a1734fe1ec4b 80 else
vpcola 0:a1734fe1ec4b 81 goto exit;
vpcola 0:a1734fe1ec4b 82
vpcola 0:a1734fe1ec4b 83 topicString.cstring = "pubtopic";
vpcola 0:a1734fe1ec4b 84 while (!toStop)
vpcola 0:a1734fe1ec4b 85 {
vpcola 0:a1734fe1ec4b 86 if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
vpcola 0:a1734fe1ec4b 87 {
vpcola 0:a1734fe1ec4b 88 int dup;
vpcola 0:a1734fe1ec4b 89 int qos;
vpcola 0:a1734fe1ec4b 90 int retained;
vpcola 0:a1734fe1ec4b 91 int msgid;
vpcola 0:a1734fe1ec4b 92 int payloadlen_in;
vpcola 0:a1734fe1ec4b 93 char* payload_in;
vpcola 0:a1734fe1ec4b 94 int rc;
vpcola 0:a1734fe1ec4b 95 MQTTString receivedTopic;
vpcola 0:a1734fe1ec4b 96
vpcola 0:a1734fe1ec4b 97 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
vpcola 0:a1734fe1ec4b 98 &payload_in, &payloadlen_in, buf, buflen);
vpcola 0:a1734fe1ec4b 99 printf("message arrived %.*s\n", payloadlen_in, payload_in);
vpcola 0:a1734fe1ec4b 100 }
vpcola 0:a1734fe1ec4b 101
vpcola 0:a1734fe1ec4b 102 printf("publishing reading\n");
vpcola 0:a1734fe1ec4b 103 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
vpcola 0:a1734fe1ec4b 104 rc = mysock.send(buf, len);
vpcola 0:a1734fe1ec4b 105 }
vpcola 0:a1734fe1ec4b 106
vpcola 0:a1734fe1ec4b 107 printf("disconnecting\n");
vpcola 0:a1734fe1ec4b 108 len = MQTTSerialize_disconnect(buf, buflen);
vpcola 0:a1734fe1ec4b 109 rc = mysock.send(buf, len);
vpcola 0:a1734fe1ec4b 110
vpcola 0:a1734fe1ec4b 111 exit:
vpcola 0:a1734fe1ec4b 112 eth.disconnect();
vpcola 0:a1734fe1ec4b 113
vpcola 0:a1734fe1ec4b 114 return 0;
vpcola 0:a1734fe1ec4b 115 }