MQTT Publish - Subscribe

Dependencies:   EthernetInterface MQTT Servo mbed-rtos mbed

Fork of MQTTPublishSubcribe by smd.iotkit2.ch

Committer:
stefan1691
Date:
Sat Apr 04 17:11:43 2015 +0000
Revision:
19:65884a321e48
Parent:
18:a04f238cfc28
Child:
20:d029102c00fa
MQTT Subscribe und Publish

Who changed what in which revision?

UserRevisionLine numberNew contents of line
stefan1691 19:65884a321e48 1 /** MQTT Subscribe und Publish
stefan1691 19:65884a321e48 2 Wartet auf eine Publish Nachricht und sendet
stefan1691 19:65884a321e48 3 dann die dazugehoerende Nachricht zurueck */
stefan1691 17:46dc5a8e143d 4 #include "mbed.h"
icraggs 8:a3e3113054a1 5 #include "MQTTEthernet.h"
icraggs 2:638c854c0695 6 #include "MQTTClient.h"
icraggs 2:638c854c0695 7
stefan1691 19:65884a321e48 8 // UI
stefan1691 19:65884a321e48 9 DigitalOut led1( LED1 );
stefan1691 19:65884a321e48 10 // Licht Poti
stefan1691 19:65884a321e48 11 AnalogIn poti( A0 );
stefan1691 19:65884a321e48 12 // Licht Sensor
stefan1691 19:65884a321e48 13 AnalogIn light( A1 );
stefan1691 19:65884a321e48 14
stefan1691 17:46dc5a8e143d 15 // MQTT Brocker
stefan1691 17:46dc5a8e143d 16 //char* hostname = "iot.eclipse.org";
stefan1691 18:a04f238cfc28 17 char* hostname = "raspi2x";
stefan1691 17:46dc5a8e143d 18 int port = 1883;
stefan1691 19:65884a321e48 19 // Client als Pointer, wird in main gesetzt.
stefan1691 19:65884a321e48 20 MQTT::Client<MQTTEthernet, Countdown> *clientGlobal;
stefan1691 19:65884a321e48 21
stefan1691 19:65884a321e48 22 // Topic's subscribe
stefan1691 19:65884a321e48 23 char* topicSensors = "mbed/k64f/iotkit/get/#";
stefan1691 19:65884a321e48 24 // Topic's publish
stefan1691 19:65884a321e48 25 char* topicPoti = "mbed/k64f/iotkit/post/poti";
stefan1691 19:65884a321e48 26 char* topicLight = "mbed/k64f/iotkit/post/light";
stefan1691 19:65884a321e48 27
stefan1691 19:65884a321e48 28 // MQTT Message
stefan1691 19:65884a321e48 29 MQTT::Message message;
stefan1691 19:65884a321e48 30 // I/O Buffer
stefan1691 19:65884a321e48 31 char buf[100];
icraggs 2:638c854c0695 32
stefan1691 19:65884a321e48 33 /** Hilfsfunktion zum Publizieren auf MQTT Broker */
stefan1691 19:65884a321e48 34 void publish( char* topic, float value )
stefan1691 19:65884a321e48 35 {
stefan1691 19:65884a321e48 36 // Message als JSON aufbereiten und senden
stefan1691 19:65884a321e48 37 sprintf( buf, "%f", value );
stefan1691 19:65884a321e48 38 printf( "Publish: %s/%s\n", topic, buf );
stefan1691 19:65884a321e48 39 message.qos = MQTT::QOS0;
stefan1691 19:65884a321e48 40 message.retained = false;
stefan1691 19:65884a321e48 41 message.dup = false;
stefan1691 19:65884a321e48 42 message.payload = (void*) buf;
stefan1691 19:65884a321e48 43 message.payloadlen = strlen(buf)+1;
stefan1691 19:65884a321e48 44 clientGlobal->publish( topic, message);
stefan1691 19:65884a321e48 45 }
stefan1691 19:65884a321e48 46
stefan1691 19:65884a321e48 47 /** Daten empfangen von MQTT Broker */
stefan1691 17:46dc5a8e143d 48 void messageArrived( MQTT::MessageData& md )
icraggs 2:638c854c0695 49 {
icraggs 9:5beb8609e9f7 50 MQTT::Message &message = md.message;
icraggs 9:5beb8609e9f7 51 printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
stefan1691 19:65884a321e48 52 printf("Topic %.*s, ", md.topicName.lenstring.len, (char*) md.topicName.lenstring.data );
stefan1691 17:46dc5a8e143d 53 printf("Payload %.*s\n", message.payloadlen, (char*) message.payload);
stefan1691 19:65884a321e48 54
stefan1691 19:65884a321e48 55 // in C sind Zeiger auf char immer Positionsangaben, deshalb sind Additionen und Subtraktionen moeglich.
stefan1691 19:65884a321e48 56 if ( strncmp( (char*) md.topicName.lenstring.data + md.topicName.lenstring.len - 4, "poti", 4 ) == 0 )
stefan1691 19:65884a321e48 57 {
stefan1691 19:65884a321e48 58 printf( "Poti %f\n", poti.read() );
stefan1691 19:65884a321e48 59 publish( topicPoti, poti.read() );
stefan1691 19:65884a321e48 60 }
stefan1691 19:65884a321e48 61 if ( strncmp( (char*) md.topicName.lenstring.data + md.topicName.lenstring.len - 5, "light", 5) == 0 )
stefan1691 19:65884a321e48 62 {
stefan1691 19:65884a321e48 63 printf( "Light %f\n", light.read() );
stefan1691 19:65884a321e48 64 publish( topicLight, light.read() );
stefan1691 19:65884a321e48 65 }
icraggs 2:638c854c0695 66 }
icraggs 0:0cae29831d01 67
stefan1691 17:46dc5a8e143d 68 int main()
icraggs 2:638c854c0695 69 {
stefan1691 17:46dc5a8e143d 70 // Ethernet und MQTT initialisieren (muss in main erfolgen)
icraggs 8:a3e3113054a1 71 MQTTEthernet ipstack = MQTTEthernet();
icraggs 8:a3e3113054a1 72 MQTT::Client<MQTTEthernet, Countdown> client = MQTT::Client<MQTTEthernet, Countdown>(ipstack);
stefan1691 19:65884a321e48 73 // HACK um client anderen Funktionen zur Verfuegung zu stellen
stefan1691 19:65884a321e48 74 clientGlobal = &client;
icraggs 3:7a6a899de7cc 75
stefan1691 17:46dc5a8e143d 76 printf("Connecting to %s:%d\n", hostname, port);
icraggs 6:e4c690c45021 77 int rc = ipstack.connect(hostname, port);
icraggs 6:e4c690c45021 78 if (rc != 0)
stefan1691 17:46dc5a8e143d 79 printf("rc from TCP connect is %d\n", rc);
icraggs 6:e4c690c45021 80
stefan1691 17:46dc5a8e143d 81 // mit MQTT Broker verbinden
icraggs 6:e4c690c45021 82 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
icraggs 6:e4c690c45021 83 data.MQTTVersion = 3;
icraggs 8:a3e3113054a1 84 data.clientID.cstring = "mbed-sample";
icraggs 12:086a9314e8a5 85 data.username.cstring = "testuser";
icraggs 12:086a9314e8a5 86 data.password.cstring = "testpassword";
stefan1691 17:46dc5a8e143d 87 rc = client.connect(data);
stefan1691 17:46dc5a8e143d 88 printf("rc from MQTT connect is %d\n", rc);
icraggs 2:638c854c0695 89
stefan1691 19:65884a321e48 90 rc = client.subscribe( topicSensors, MQTT::QOS0, messageArrived );
stefan1691 17:46dc5a8e143d 91 printf("rc from MQTT subscribe is %d\n", rc);
icraggs 0:0cae29831d01 92
stefan1691 17:46dc5a8e143d 93 while ( 1 )
icraggs 12:086a9314e8a5 94 {
stefan1691 17:46dc5a8e143d 95 led1 = 0;
stefan1691 17:46dc5a8e143d 96 client.yield(1000); // MQTT Client darf empfangen
stefan1691 17:46dc5a8e143d 97 led1 = 1;
stefan1691 17:46dc5a8e143d 98 wait( 0.2 );
icraggs 12:086a9314e8a5 99 }
icraggs 2:638c854c0695 100
stefan1691 19:65884a321e48 101 if ((rc = client.unsubscribe(topicSensors)) != 0)
sam_grove 5:4a257f6ac09a 102 printf("rc from unsubscribe was %d\n", rc);
icraggs 2:638c854c0695 103
icraggs 8:a3e3113054a1 104 if ((rc = client.disconnect()) != 0)
sam_grove 5:4a257f6ac09a 105 printf("rc from disconnect was %d\n", rc);
icraggs 2:638c854c0695 106
icraggs 2:638c854c0695 107 ipstack.disconnect();
icraggs 0:0cae29831d01 108 return 0;
icraggs 0:0cae29831d01 109 }