Added MQTTPacket_Read() in MQTTPacket.h

Fork of MQTTPacket by MQTT

Committer:
icraggs
Date:
Thu Apr 10 15:11:16 2014 +0000
Revision:
2:bc3bc0e3b764
Add samples and test

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 2:bc3bc0e3b764 1 #include "MQTTPacket.h"
icraggs 2:bc3bc0e3b764 2
icraggs 2:bc3bc0e3b764 3 #include <errno.h>
icraggs 2:bc3bc0e3b764 4 #include <stdlib.h>
icraggs 2:bc3bc0e3b764 5 #include <string.h>
icraggs 2:bc3bc0e3b764 6
icraggs 2:bc3bc0e3b764 7 #include "EthernetInterface.h"
icraggs 2:bc3bc0e3b764 8
icraggs 2:bc3bc0e3b764 9
icraggs 2:bc3bc0e3b764 10 TCPSocketConnection mysock;
icraggs 2:bc3bc0e3b764 11
icraggs 2:bc3bc0e3b764 12 int getdata(char* buf, int count)
icraggs 2:bc3bc0e3b764 13 {
icraggs 2:bc3bc0e3b764 14 return mysock.receive(buf, (size_t)count);
icraggs 2:bc3bc0e3b764 15 }
icraggs 2:bc3bc0e3b764 16
icraggs 2:bc3bc0e3b764 17 int toStop = 0;
icraggs 2:bc3bc0e3b764 18
icraggs 2:bc3bc0e3b764 19
icraggs 2:bc3bc0e3b764 20 int main()
icraggs 2:bc3bc0e3b764 21 {
icraggs 2:bc3bc0e3b764 22 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
icraggs 2:bc3bc0e3b764 23 int rc = 0;
icraggs 2:bc3bc0e3b764 24 char buf[200];
icraggs 2:bc3bc0e3b764 25 int buflen = sizeof(buf);
icraggs 2:bc3bc0e3b764 26 int msgid = 1;
icraggs 2:bc3bc0e3b764 27 MQTTString topicString = MQTTString_initializer;
icraggs 2:bc3bc0e3b764 28 int req_qos = 0;
icraggs 2:bc3bc0e3b764 29 char* payload = "mypayload";
icraggs 2:bc3bc0e3b764 30 int payloadlen = strlen(payload);
icraggs 2:bc3bc0e3b764 31 int len = 0;
icraggs 2:bc3bc0e3b764 32 EthernetInterface eth;
icraggs 2:bc3bc0e3b764 33
icraggs 2:bc3bc0e3b764 34 eth.init(); //Use DHCP
icraggs 2:bc3bc0e3b764 35 eth.connect();
icraggs 2:bc3bc0e3b764 36
icraggs 2:bc3bc0e3b764 37 rc = mysock.connect("m2m.eclipse.org", 1883);
icraggs 2:bc3bc0e3b764 38
icraggs 2:bc3bc0e3b764 39 data.clientID.cstring = "SendReceive mbed MQTT ";
icraggs 2:bc3bc0e3b764 40 data.keepAliveInterval = 20;
icraggs 2:bc3bc0e3b764 41 data.cleansession = 1;
icraggs 2:bc3bc0e3b764 42
icraggs 2:bc3bc0e3b764 43 mysock.set_blocking(true, 1000); /* 1 second Timeout */
icraggs 2:bc3bc0e3b764 44
icraggs 2:bc3bc0e3b764 45 len = MQTTSerialize_connect(buf, buflen, &data);
icraggs 2:bc3bc0e3b764 46 rc = mysock.send(buf, len);
icraggs 2:bc3bc0e3b764 47
icraggs 2:bc3bc0e3b764 48 /* wait for connack */
icraggs 2:bc3bc0e3b764 49 if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
icraggs 2:bc3bc0e3b764 50 {
icraggs 2:bc3bc0e3b764 51 int connack_rc;
icraggs 2:bc3bc0e3b764 52
icraggs 2:bc3bc0e3b764 53 if (MQTTDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0)
icraggs 2:bc3bc0e3b764 54 {
icraggs 2:bc3bc0e3b764 55 printf("Unable to connect, return code %d\n", connack_rc);
icraggs 2:bc3bc0e3b764 56 goto exit;
icraggs 2:bc3bc0e3b764 57 }
icraggs 2:bc3bc0e3b764 58 }
icraggs 2:bc3bc0e3b764 59 else
icraggs 2:bc3bc0e3b764 60 goto exit;
icraggs 2:bc3bc0e3b764 61
icraggs 2:bc3bc0e3b764 62 /* subscribe */
icraggs 2:bc3bc0e3b764 63 topicString.cstring = "substopic";
icraggs 2:bc3bc0e3b764 64 len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
icraggs 2:bc3bc0e3b764 65
icraggs 2:bc3bc0e3b764 66 rc = mysock.send(buf, len);
icraggs 2:bc3bc0e3b764 67 if (MQTTPacket_read(buf, buflen, getdata) == SUBACK) /* wait for suback */
icraggs 2:bc3bc0e3b764 68 {
icraggs 2:bc3bc0e3b764 69 int submsgid;
icraggs 2:bc3bc0e3b764 70 int subcount;
icraggs 2:bc3bc0e3b764 71 int granted_qos;
icraggs 2:bc3bc0e3b764 72
icraggs 2:bc3bc0e3b764 73 rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
icraggs 2:bc3bc0e3b764 74 if (granted_qos != 0)
icraggs 2:bc3bc0e3b764 75 {
icraggs 2:bc3bc0e3b764 76 printf("granted qos != 0, %d\n", granted_qos);
icraggs 2:bc3bc0e3b764 77 goto exit;
icraggs 2:bc3bc0e3b764 78 }
icraggs 2:bc3bc0e3b764 79 }
icraggs 2:bc3bc0e3b764 80 else
icraggs 2:bc3bc0e3b764 81 goto exit;
icraggs 2:bc3bc0e3b764 82
icraggs 2:bc3bc0e3b764 83 topicString.cstring = "pubtopic";
icraggs 2:bc3bc0e3b764 84 while (!toStop)
icraggs 2:bc3bc0e3b764 85 {
icraggs 2:bc3bc0e3b764 86 if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
icraggs 2:bc3bc0e3b764 87 {
icraggs 2:bc3bc0e3b764 88 int dup;
icraggs 2:bc3bc0e3b764 89 int qos;
icraggs 2:bc3bc0e3b764 90 int retained;
icraggs 2:bc3bc0e3b764 91 int msgid;
icraggs 2:bc3bc0e3b764 92 int payloadlen_in;
icraggs 2:bc3bc0e3b764 93 char* payload_in;
icraggs 2:bc3bc0e3b764 94 int rc;
icraggs 2:bc3bc0e3b764 95 MQTTString receivedTopic;
icraggs 2:bc3bc0e3b764 96
icraggs 2:bc3bc0e3b764 97 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
icraggs 2:bc3bc0e3b764 98 &payload_in, &payloadlen_in, buf, buflen);
icraggs 2:bc3bc0e3b764 99 printf("message arrived %.*s\n", payloadlen_in, payload_in);
icraggs 2:bc3bc0e3b764 100 }
icraggs 2:bc3bc0e3b764 101
icraggs 2:bc3bc0e3b764 102 printf("publishing reading\n");
icraggs 2:bc3bc0e3b764 103 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
icraggs 2:bc3bc0e3b764 104 rc = mysock.send(buf, len);
icraggs 2:bc3bc0e3b764 105 }
icraggs 2:bc3bc0e3b764 106
icraggs 2:bc3bc0e3b764 107 printf("disconnecting\n");
icraggs 2:bc3bc0e3b764 108 len = MQTTSerialize_disconnect(buf, buflen);
icraggs 2:bc3bc0e3b764 109 rc = mysock.send(buf, len);
icraggs 2:bc3bc0e3b764 110
icraggs 2:bc3bc0e3b764 111 exit:
icraggs 2:bc3bc0e3b764 112 eth.disconnect();
icraggs 2:bc3bc0e3b764 113
icraggs 2:bc3bc0e3b764 114 return 0;
icraggs 2:bc3bc0e3b764 115 }