Fork of my MQTTGateway

Dependencies:   mbed-http

Committer:
vpcola
Date:
Sat Apr 08 14:45:51 2017 +0000
Revision:
0:f1d3878b8dd9
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 0:f1d3878b8dd9 1 #include "MQTTPacket.h"
vpcola 0:f1d3878b8dd9 2
vpcola 0:f1d3878b8dd9 3 #include <errno.h>
vpcola 0:f1d3878b8dd9 4 #include <stdlib.h>
vpcola 0:f1d3878b8dd9 5 #include <string.h>
vpcola 0:f1d3878b8dd9 6
vpcola 0:f1d3878b8dd9 7 #include "EthernetInterface.h"
vpcola 0:f1d3878b8dd9 8
vpcola 0:f1d3878b8dd9 9
vpcola 0:f1d3878b8dd9 10 TCPSocketConnection mysock;
vpcola 0:f1d3878b8dd9 11
vpcola 0:f1d3878b8dd9 12 int getdata(char* buf, int count)
vpcola 0:f1d3878b8dd9 13 {
vpcola 0:f1d3878b8dd9 14 return mysock.receive(buf, (size_t)count);
vpcola 0:f1d3878b8dd9 15 }
vpcola 0:f1d3878b8dd9 16
vpcola 0:f1d3878b8dd9 17 int toStop = 0;
vpcola 0:f1d3878b8dd9 18
vpcola 0:f1d3878b8dd9 19
vpcola 0:f1d3878b8dd9 20 int main()
vpcola 0:f1d3878b8dd9 21 {
vpcola 0:f1d3878b8dd9 22 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
vpcola 0:f1d3878b8dd9 23 int rc = 0;
vpcola 0:f1d3878b8dd9 24 char buf[200];
vpcola 0:f1d3878b8dd9 25 int buflen = sizeof(buf);
vpcola 0:f1d3878b8dd9 26 int msgid = 1;
vpcola 0:f1d3878b8dd9 27 MQTTString topicString = MQTTString_initializer;
vpcola 0:f1d3878b8dd9 28 int req_qos = 0;
vpcola 0:f1d3878b8dd9 29 char* payload = "mypayload";
vpcola 0:f1d3878b8dd9 30 int payloadlen = strlen(payload);
vpcola 0:f1d3878b8dd9 31 int len = 0;
vpcola 0:f1d3878b8dd9 32 EthernetInterface eth;
vpcola 0:f1d3878b8dd9 33
vpcola 0:f1d3878b8dd9 34 eth.init(); //Use DHCP
vpcola 0:f1d3878b8dd9 35 eth.connect();
vpcola 0:f1d3878b8dd9 36
vpcola 0:f1d3878b8dd9 37 rc = mysock.connect("m2m.eclipse.org", 1883);
vpcola 0:f1d3878b8dd9 38
vpcola 0:f1d3878b8dd9 39 data.clientID.cstring = "SendReceive mbed MQTT ";
vpcola 0:f1d3878b8dd9 40 data.keepAliveInterval = 20;
vpcola 0:f1d3878b8dd9 41 data.cleansession = 1;
vpcola 0:f1d3878b8dd9 42
vpcola 0:f1d3878b8dd9 43 mysock.set_blocking(true, 1000); /* 1 second Timeout */
vpcola 0:f1d3878b8dd9 44
vpcola 0:f1d3878b8dd9 45 len = MQTTSerialize_connect(buf, buflen, &data);
vpcola 0:f1d3878b8dd9 46 rc = mysock.send(buf, len);
vpcola 0:f1d3878b8dd9 47
vpcola 0:f1d3878b8dd9 48 /* wait for connack */
vpcola 0:f1d3878b8dd9 49 if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
vpcola 0:f1d3878b8dd9 50 {
vpcola 0:f1d3878b8dd9 51 int connack_rc;
vpcola 0:f1d3878b8dd9 52
vpcola 0:f1d3878b8dd9 53 if (MQTTDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0)
vpcola 0:f1d3878b8dd9 54 {
vpcola 0:f1d3878b8dd9 55 printf("Unable to connect, return code %d\n", connack_rc);
vpcola 0:f1d3878b8dd9 56 goto exit;
vpcola 0:f1d3878b8dd9 57 }
vpcola 0:f1d3878b8dd9 58 }
vpcola 0:f1d3878b8dd9 59 else
vpcola 0:f1d3878b8dd9 60 goto exit;
vpcola 0:f1d3878b8dd9 61
vpcola 0:f1d3878b8dd9 62 /* subscribe */
vpcola 0:f1d3878b8dd9 63 topicString.cstring = "substopic";
vpcola 0:f1d3878b8dd9 64 len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
vpcola 0:f1d3878b8dd9 65
vpcola 0:f1d3878b8dd9 66 rc = mysock.send(buf, len);
vpcola 0:f1d3878b8dd9 67 if (MQTTPacket_read(buf, buflen, getdata) == SUBACK) /* wait for suback */
vpcola 0:f1d3878b8dd9 68 {
vpcola 0:f1d3878b8dd9 69 int submsgid;
vpcola 0:f1d3878b8dd9 70 int subcount;
vpcola 0:f1d3878b8dd9 71 int granted_qos;
vpcola 0:f1d3878b8dd9 72
vpcola 0:f1d3878b8dd9 73 rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
vpcola 0:f1d3878b8dd9 74 if (granted_qos != 0)
vpcola 0:f1d3878b8dd9 75 {
vpcola 0:f1d3878b8dd9 76 printf("granted qos != 0, %d\n", granted_qos);
vpcola 0:f1d3878b8dd9 77 goto exit;
vpcola 0:f1d3878b8dd9 78 }
vpcola 0:f1d3878b8dd9 79 }
vpcola 0:f1d3878b8dd9 80 else
vpcola 0:f1d3878b8dd9 81 goto exit;
vpcola 0:f1d3878b8dd9 82
vpcola 0:f1d3878b8dd9 83 topicString.cstring = "pubtopic";
vpcola 0:f1d3878b8dd9 84 while (!toStop)
vpcola 0:f1d3878b8dd9 85 {
vpcola 0:f1d3878b8dd9 86 if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
vpcola 0:f1d3878b8dd9 87 {
vpcola 0:f1d3878b8dd9 88 int dup;
vpcola 0:f1d3878b8dd9 89 int qos;
vpcola 0:f1d3878b8dd9 90 int retained;
vpcola 0:f1d3878b8dd9 91 int msgid;
vpcola 0:f1d3878b8dd9 92 int payloadlen_in;
vpcola 0:f1d3878b8dd9 93 char* payload_in;
vpcola 0:f1d3878b8dd9 94 int rc;
vpcola 0:f1d3878b8dd9 95 MQTTString receivedTopic;
vpcola 0:f1d3878b8dd9 96
vpcola 0:f1d3878b8dd9 97 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
vpcola 0:f1d3878b8dd9 98 &payload_in, &payloadlen_in, buf, buflen);
vpcola 0:f1d3878b8dd9 99 printf("message arrived %.*s\n", payloadlen_in, payload_in);
vpcola 0:f1d3878b8dd9 100 }
vpcola 0:f1d3878b8dd9 101
vpcola 0:f1d3878b8dd9 102 printf("publishing reading\n");
vpcola 0:f1d3878b8dd9 103 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
vpcola 0:f1d3878b8dd9 104 rc = mysock.send(buf, len);
vpcola 0:f1d3878b8dd9 105 }
vpcola 0:f1d3878b8dd9 106
vpcola 0:f1d3878b8dd9 107 printf("disconnecting\n");
vpcola 0:f1d3878b8dd9 108 len = MQTTSerialize_disconnect(buf, buflen);
vpcola 0:f1d3878b8dd9 109 rc = mysock.send(buf, len);
vpcola 0:f1d3878b8dd9 110
vpcola 0:f1d3878b8dd9 111 exit:
vpcola 0:f1d3878b8dd9 112 eth.disconnect();
vpcola 0:f1d3878b8dd9 113
vpcola 0:f1d3878b8dd9 114 return 0;
vpcola 0:f1d3878b8dd9 115 }