Simple mqtt client using the MqttPacket lib to create a connect to the public broker and publish series of message. Done over Ethernet.

Dependencies:   C12832_lcd EthernetInterface MQTTPacket mbed-rtos mbed

Committer:
dvn
Date:
Wed Jun 13 19:51:38 2018 +0000
Revision:
0:96a5aefac643
Child:
1:d7773c5860c2
init mqtt message handling

Who changed what in which revision?

UserRevisionLine numberNew contents of line
dvn 0:96a5aefac643 1 #include "MQTTPacket.h"
dvn 0:96a5aefac643 2
dvn 0:96a5aefac643 3 #include <errno.h>
dvn 0:96a5aefac643 4 #include <stdlib.h>
dvn 0:96a5aefac643 5 #include <string.h>
dvn 0:96a5aefac643 6
dvn 0:96a5aefac643 7 #include "EthernetInterface.h"
dvn 0:96a5aefac643 8
dvn 0:96a5aefac643 9
dvn 0:96a5aefac643 10 TCPSocketConnection mysock;
dvn 0:96a5aefac643 11
dvn 0:96a5aefac643 12 int getdata(char* buf, int count)
dvn 0:96a5aefac643 13 {
dvn 0:96a5aefac643 14 return mysock.receive(buf, (size_t)count);
dvn 0:96a5aefac643 15 }
dvn 0:96a5aefac643 16
dvn 0:96a5aefac643 17 int toStop = 0;
dvn 0:96a5aefac643 18
dvn 0:96a5aefac643 19
dvn 0:96a5aefac643 20 int main()
dvn 0:96a5aefac643 21 {
dvn 0:96a5aefac643 22 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
dvn 0:96a5aefac643 23 int rc = 0;
dvn 0:96a5aefac643 24 char buf[200];
dvn 0:96a5aefac643 25 int buflen = sizeof(buf);
dvn 0:96a5aefac643 26 int msgid = 1;
dvn 0:96a5aefac643 27 MQTTString topicString = MQTTString_initializer;
dvn 0:96a5aefac643 28 int req_qos = 0;
dvn 0:96a5aefac643 29 char* payload = "mypayload";
dvn 0:96a5aefac643 30 int payloadlen = strlen(payload);
dvn 0:96a5aefac643 31 int len = 0;
dvn 0:96a5aefac643 32 EthernetInterface eth;
dvn 0:96a5aefac643 33
dvn 0:96a5aefac643 34 eth.init(); //Use DHCP
dvn 0:96a5aefac643 35 eth.connect();
dvn 0:96a5aefac643 36
dvn 0:96a5aefac643 37 rc = mysock.connect("m2m.eclipse.org", 1883);
dvn 0:96a5aefac643 38
dvn 0:96a5aefac643 39 data.clientID.cstring = "SendReceive mbed MQTT ";
dvn 0:96a5aefac643 40 data.keepAliveInterval = 20;
dvn 0:96a5aefac643 41 data.cleansession = 1;
dvn 0:96a5aefac643 42
dvn 0:96a5aefac643 43 mysock.set_blocking(true, 1000); /* 1 second Timeout */
dvn 0:96a5aefac643 44
dvn 0:96a5aefac643 45 len = MQTTSerialize_connect(buf, buflen, &data);
dvn 0:96a5aefac643 46 rc = mysock.send(buf, len);
dvn 0:96a5aefac643 47
dvn 0:96a5aefac643 48 /* wait for connack */
dvn 0:96a5aefac643 49 if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
dvn 0:96a5aefac643 50 {
dvn 0:96a5aefac643 51 int connack_rc;
dvn 0:96a5aefac643 52
dvn 0:96a5aefac643 53 if (MQTTDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0)
dvn 0:96a5aefac643 54 {
dvn 0:96a5aefac643 55 printf("Unable to connect, return code %d\n", connack_rc);
dvn 0:96a5aefac643 56 goto exit;
dvn 0:96a5aefac643 57 }
dvn 0:96a5aefac643 58 }
dvn 0:96a5aefac643 59 else
dvn 0:96a5aefac643 60 goto exit;
dvn 0:96a5aefac643 61
dvn 0:96a5aefac643 62 /* subscribe */
dvn 0:96a5aefac643 63 topicString.cstring = "substopic";
dvn 0:96a5aefac643 64 len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
dvn 0:96a5aefac643 65
dvn 0:96a5aefac643 66 rc = mysock.send(buf, len);
dvn 0:96a5aefac643 67 if (MQTTPacket_read(buf, buflen, getdata) == SUBACK) /* wait for suback */
dvn 0:96a5aefac643 68 {
dvn 0:96a5aefac643 69 int submsgid;
dvn 0:96a5aefac643 70 int subcount;
dvn 0:96a5aefac643 71 int granted_qos;
dvn 0:96a5aefac643 72
dvn 0:96a5aefac643 73 rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
dvn 0:96a5aefac643 74 if (granted_qos != 0)
dvn 0:96a5aefac643 75 {
dvn 0:96a5aefac643 76 printf("granted qos != 0, %d\n", granted_qos);
dvn 0:96a5aefac643 77 goto exit;
dvn 0:96a5aefac643 78 }
dvn 0:96a5aefac643 79 }
dvn 0:96a5aefac643 80 else
dvn 0:96a5aefac643 81 goto exit;
dvn 0:96a5aefac643 82
dvn 0:96a5aefac643 83 topicString.cstring = "pubtopic";
dvn 0:96a5aefac643 84 while (!toStop)
dvn 0:96a5aefac643 85 {
dvn 0:96a5aefac643 86 if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
dvn 0:96a5aefac643 87 {
dvn 0:96a5aefac643 88 int dup;
dvn 0:96a5aefac643 89 int qos;
dvn 0:96a5aefac643 90 int retained;
dvn 0:96a5aefac643 91 int msgid;
dvn 0:96a5aefac643 92 int payloadlen_in;
dvn 0:96a5aefac643 93 char* payload_in;
dvn 0:96a5aefac643 94 int rc;
dvn 0:96a5aefac643 95 MQTTString receivedTopic;
dvn 0:96a5aefac643 96
dvn 0:96a5aefac643 97 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
dvn 0:96a5aefac643 98 &payload_in, &payloadlen_in, buf, buflen);
dvn 0:96a5aefac643 99 printf("message arrived %.*s\n", payloadlen_in, payload_in);
dvn 0:96a5aefac643 100 }
dvn 0:96a5aefac643 101
dvn 0:96a5aefac643 102 printf("publishing reading\n");
dvn 0:96a5aefac643 103 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
dvn 0:96a5aefac643 104 rc = mysock.send(buf, len);
dvn 0:96a5aefac643 105 }
dvn 0:96a5aefac643 106
dvn 0:96a5aefac643 107 printf("disconnecting\n");
dvn 0:96a5aefac643 108 len = MQTTSerialize_disconnect(buf, buflen);
dvn 0:96a5aefac643 109 rc = mysock.send(buf, len);
dvn 0:96a5aefac643 110
dvn 0:96a5aefac643 111 exit:
dvn 0:96a5aefac643 112 eth.disconnect();
dvn 0:96a5aefac643 113
dvn 0:96a5aefac643 114 return 0;
dvn 0:96a5aefac643 115 }