MQTT Publish - Subscribe

Dependencies:   EthernetInterface MQTT Servo mbed-rtos mbed

Fork of MQTTPublishSubcribe by smd.iotkit2.ch

Committer:
stefan1691
Date:
Mon Apr 06 06:57:40 2015 +0000
Revision:
20:d029102c00fa
Parent:
19:65884a321e48
Servo integriert

Who changed what in which revision?

UserRevisionLine numberNew contents of line
stefan1691 20:d029102c00fa 1 /** MQTT Subscribe und Publish
stefan1691 19:65884a321e48 2 Wartet auf eine Publish Nachricht und sendet
stefan1691 19:65884a321e48 3 dann die dazugehoerende Nachricht zurueck */
stefan1691 17:46dc5a8e143d 4 #include "mbed.h"
icraggs 8:a3e3113054a1 5 #include "MQTTEthernet.h"
icraggs 2:638c854c0695 6 #include "MQTTClient.h"
stefan1691 20:d029102c00fa 7 #include "Servo.h"
icraggs 2:638c854c0695 8
stefan1691 19:65884a321e48 9 // UI
stefan1691 19:65884a321e48 10 DigitalOut led1( LED1 );
stefan1691 19:65884a321e48 11 // Licht Poti
stefan1691 19:65884a321e48 12 AnalogIn poti( A0 );
stefan1691 19:65884a321e48 13 // Licht Sensor
stefan1691 19:65884a321e48 14 AnalogIn light( A1 );
stefan1691 20:d029102c00fa 15 // Servos
stefan1691 20:d029102c00fa 16 Servo servo1 ( D9 );
stefan1691 19:65884a321e48 17
stefan1691 17:46dc5a8e143d 18 // MQTT Brocker
stefan1691 17:46dc5a8e143d 19 //char* hostname = "iot.eclipse.org";
stefan1691 18:a04f238cfc28 20 char* hostname = "raspi2x";
stefan1691 17:46dc5a8e143d 21 int port = 1883;
stefan1691 19:65884a321e48 22 // Client als Pointer, wird in main gesetzt.
stefan1691 19:65884a321e48 23 MQTT::Client<MQTTEthernet, Countdown> *clientGlobal;
stefan1691 19:65884a321e48 24
stefan1691 19:65884a321e48 25 // Topic's subscribe
stefan1691 19:65884a321e48 26 char* topicSensors = "mbed/k64f/iotkit/get/#";
stefan1691 19:65884a321e48 27 // Topic's publish
stefan1691 19:65884a321e48 28 char* topicPoti = "mbed/k64f/iotkit/post/poti";
stefan1691 19:65884a321e48 29 char* topicLight = "mbed/k64f/iotkit/post/light";
stefan1691 19:65884a321e48 30
stefan1691 19:65884a321e48 31 // MQTT Message
stefan1691 19:65884a321e48 32 MQTT::Message message;
stefan1691 19:65884a321e48 33 // I/O Buffer
stefan1691 19:65884a321e48 34 char buf[100];
icraggs 2:638c854c0695 35
stefan1691 19:65884a321e48 36 /** Hilfsfunktion zum Publizieren auf MQTT Broker */
stefan1691 19:65884a321e48 37 void publish( char* topic, float value )
stefan1691 19:65884a321e48 38 {
stefan1691 19:65884a321e48 39 // Message als JSON aufbereiten und senden
stefan1691 19:65884a321e48 40 sprintf( buf, "%f", value );
stefan1691 19:65884a321e48 41 printf( "Publish: %s/%s\n", topic, buf );
stefan1691 19:65884a321e48 42 message.qos = MQTT::QOS0;
stefan1691 19:65884a321e48 43 message.retained = false;
stefan1691 19:65884a321e48 44 message.dup = false;
stefan1691 19:65884a321e48 45 message.payload = (void*) buf;
stefan1691 19:65884a321e48 46 message.payloadlen = strlen(buf)+1;
stefan1691 19:65884a321e48 47 clientGlobal->publish( topic, message);
stefan1691 19:65884a321e48 48 }
stefan1691 19:65884a321e48 49
stefan1691 19:65884a321e48 50 /** Daten empfangen von MQTT Broker */
stefan1691 17:46dc5a8e143d 51 void messageArrived( MQTT::MessageData& md )
icraggs 2:638c854c0695 52 {
stefan1691 20:d029102c00fa 53 float value;
icraggs 9:5beb8609e9f7 54 MQTT::Message &message = md.message;
icraggs 9:5beb8609e9f7 55 printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
stefan1691 19:65884a321e48 56 printf("Topic %.*s, ", md.topicName.lenstring.len, (char*) md.topicName.lenstring.data );
stefan1691 17:46dc5a8e143d 57 printf("Payload %.*s\n", message.payloadlen, (char*) message.payload);
stefan1691 19:65884a321e48 58
stefan1691 20:d029102c00fa 59 // Sensoren
stefan1691 19:65884a321e48 60 // in C sind Zeiger auf char immer Positionsangaben, deshalb sind Additionen und Subtraktionen moeglich.
stefan1691 19:65884a321e48 61 if ( strncmp( (char*) md.topicName.lenstring.data + md.topicName.lenstring.len - 4, "poti", 4 ) == 0 )
stefan1691 19:65884a321e48 62 {
stefan1691 19:65884a321e48 63 printf( "Poti %f\n", poti.read() );
stefan1691 19:65884a321e48 64 publish( topicPoti, poti.read() );
stefan1691 19:65884a321e48 65 }
stefan1691 19:65884a321e48 66 if ( strncmp( (char*) md.topicName.lenstring.data + md.topicName.lenstring.len - 5, "light", 5) == 0 )
stefan1691 19:65884a321e48 67 {
stefan1691 19:65884a321e48 68 printf( "Light %f\n", light.read() );
stefan1691 19:65884a321e48 69 publish( topicLight, light.read() );
stefan1691 20:d029102c00fa 70 }
stefan1691 20:d029102c00fa 71
stefan1691 20:d029102c00fa 72 // Aktoren
stefan1691 20:d029102c00fa 73 if ( strncmp( (char*) md.topicName.lenstring.data + md.topicName.lenstring.len - 6, "servo1", 6) == 0 )
stefan1691 20:d029102c00fa 74 {
stefan1691 20:d029102c00fa 75 sscanf( (char*) message.payload, "%f", &value );
stefan1691 20:d029102c00fa 76 servo1 = value;
stefan1691 20:d029102c00fa 77 printf( "Servo1 %f\n", value );
stefan1691 20:d029102c00fa 78 }
icraggs 2:638c854c0695 79 }
icraggs 0:0cae29831d01 80
stefan1691 17:46dc5a8e143d 81 int main()
icraggs 2:638c854c0695 82 {
stefan1691 17:46dc5a8e143d 83 // Ethernet und MQTT initialisieren (muss in main erfolgen)
icraggs 8:a3e3113054a1 84 MQTTEthernet ipstack = MQTTEthernet();
icraggs 8:a3e3113054a1 85 MQTT::Client<MQTTEthernet, Countdown> client = MQTT::Client<MQTTEthernet, Countdown>(ipstack);
stefan1691 19:65884a321e48 86 // HACK um client anderen Funktionen zur Verfuegung zu stellen
stefan1691 19:65884a321e48 87 clientGlobal = &client;
icraggs 3:7a6a899de7cc 88
stefan1691 17:46dc5a8e143d 89 printf("Connecting to %s:%d\n", hostname, port);
icraggs 6:e4c690c45021 90 int rc = ipstack.connect(hostname, port);
icraggs 6:e4c690c45021 91 if (rc != 0)
stefan1691 17:46dc5a8e143d 92 printf("rc from TCP connect is %d\n", rc);
icraggs 6:e4c690c45021 93
stefan1691 17:46dc5a8e143d 94 // mit MQTT Broker verbinden
icraggs 6:e4c690c45021 95 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
icraggs 6:e4c690c45021 96 data.MQTTVersion = 3;
icraggs 8:a3e3113054a1 97 data.clientID.cstring = "mbed-sample";
icraggs 12:086a9314e8a5 98 data.username.cstring = "testuser";
icraggs 12:086a9314e8a5 99 data.password.cstring = "testpassword";
stefan1691 17:46dc5a8e143d 100 rc = client.connect(data);
stefan1691 17:46dc5a8e143d 101 printf("rc from MQTT connect is %d\n", rc);
icraggs 2:638c854c0695 102
stefan1691 19:65884a321e48 103 rc = client.subscribe( topicSensors, MQTT::QOS0, messageArrived );
stefan1691 17:46dc5a8e143d 104 printf("rc from MQTT subscribe is %d\n", rc);
icraggs 0:0cae29831d01 105
stefan1691 17:46dc5a8e143d 106 while ( 1 )
icraggs 12:086a9314e8a5 107 {
stefan1691 17:46dc5a8e143d 108 led1 = 0;
stefan1691 17:46dc5a8e143d 109 client.yield(1000); // MQTT Client darf empfangen
stefan1691 17:46dc5a8e143d 110 led1 = 1;
stefan1691 17:46dc5a8e143d 111 wait( 0.2 );
icraggs 12:086a9314e8a5 112 }
icraggs 2:638c854c0695 113
stefan1691 19:65884a321e48 114 if ((rc = client.unsubscribe(topicSensors)) != 0)
sam_grove 5:4a257f6ac09a 115 printf("rc from unsubscribe was %d\n", rc);
icraggs 2:638c854c0695 116
icraggs 8:a3e3113054a1 117 if ((rc = client.disconnect()) != 0)
sam_grove 5:4a257f6ac09a 118 printf("rc from disconnect was %d\n", rc);
icraggs 2:638c854c0695 119
icraggs 2:638c854c0695 120 ipstack.disconnect();
icraggs 0:0cae29831d01 121 return 0;
icraggs 0:0cae29831d01 122 }