Vergil Cola
/
MQTTGatewayK64
Fork of my MQTTGateway
Diff: MQTTSManager/MQTTPacket/samples/publish-subscribe.txt
- Revision:
- 0:f1d3878b8dd9
diff -r 000000000000 -r f1d3878b8dd9 MQTTSManager/MQTTPacket/samples/publish-subscribe.txt --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTSManager/MQTTPacket/samples/publish-subscribe.txt Sat Apr 08 14:45:51 2017 +0000 @@ -0,0 +1,115 @@ +#include "MQTTPacket.h" + +#include <errno.h> +#include <stdlib.h> +#include <string.h> + +#include "EthernetInterface.h" + + +TCPSocketConnection mysock; + +int getdata(char* buf, int count) +{ + return mysock.receive(buf, (size_t)count); +} + +int toStop = 0; + + +int main() +{ + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + int rc = 0; + char buf[200]; + int buflen = sizeof(buf); + int msgid = 1; + MQTTString topicString = MQTTString_initializer; + int req_qos = 0; + char* payload = "mypayload"; + int payloadlen = strlen(payload); + int len = 0; + EthernetInterface eth; + + eth.init(); //Use DHCP + eth.connect(); + + rc = mysock.connect("m2m.eclipse.org", 1883); + + data.clientID.cstring = "SendReceive mbed MQTT "; + data.keepAliveInterval = 20; + data.cleansession = 1; + + mysock.set_blocking(true, 1000); /* 1 second Timeout */ + + len = MQTTSerialize_connect(buf, buflen, &data); + rc = mysock.send(buf, len); + + /* wait for connack */ + if (MQTTPacket_read(buf, buflen, getdata) == CONNACK) + { + int connack_rc; + + if (MQTTDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0) + { + printf("Unable to connect, return code %d\n", connack_rc); + goto exit; + } + } + else + goto exit; + + /* subscribe */ + topicString.cstring = "substopic"; + len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos); + + rc = mysock.send(buf, len); + if (MQTTPacket_read(buf, buflen, getdata) == SUBACK) /* wait for suback */ + { + int submsgid; + int subcount; + int granted_qos; + + rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen); + if (granted_qos != 0) + { + printf("granted qos != 0, %d\n", granted_qos); + goto exit; + } + } + else + goto exit; + + topicString.cstring = "pubtopic"; + while (!toStop) + { + if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH) + { + int dup; + int qos; + int retained; + int msgid; + int payloadlen_in; + char* payload_in; + int rc; + MQTTString receivedTopic; + + rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, + &payload_in, &payloadlen_in, buf, buflen); + printf("message arrived %.*s\n", payloadlen_in, payload_in); + } + + printf("publishing reading\n"); + len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen); + rc = mysock.send(buf, len); + } + + printf("disconnecting\n"); + len = MQTTSerialize_disconnect(buf, buflen); + rc = mysock.send(buf, len); + +exit: + eth.disconnect(); + + return 0; +}