Yield function call with timeout works, there was an issue with blockant socket.
Fork of HelloMQTT by
Revision 3:7a6a899de7cc, committed 2014-05-06
- Comitter:
- icraggs
- Date:
- Tue May 06 09:45:22 2014 +0000
- Parent:
- 2:638c854c0695
- Child:
- 4:c696f27c1850
- Child:
- 6:e4c690c45021
- Commit message:
- Use latest MQTT library on both mbed and Linux
Changed in this revision
--- a/MQTT.lib Wed Apr 30 13:16:09 2014 +0000 +++ b/MQTT.lib Tue May 06 09:45:22 2014 +0000 @@ -1,1 +1,1 @@ -http://mbed.org/teams/mqtt/code/MQTT/#aadb79d29330 +http://mbed.org/teams/mqtt/code/MQTT/#05fc7de97d4a
--- a/main.cpp Wed Apr 30 13:16:09 2014 +0000 +++ b/main.cpp Tue May 06 09:45:22 2014 +0000 @@ -153,13 +153,13 @@ lcd.printf("Version is %f\n", version); - MQTT::Client<IPStack, Countdown>client = MQTT::Client<IPStack, Countdown>(&ipstack); + MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack); + + client.setConnectionLostHandler(connect); MQTT::Client<IPStack, Countdown>::connectionLostInfo info = {&client, &ipstack}; int rc = connect(&info); - client.setConnectionLostHandler(connect); - rc = client.subscribe(topic, MQTT::QOS1, messageArrived); if (rc != 0) lcd.printf("rc from MQTT subscribe is %d\n", rc);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/main_linux.cpp Tue May 06 09:45:22 2014 +0000 @@ -0,0 +1,324 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + + /** + This is a sample program to illustrate the use of the MQTT Client library + on Linux. The Client class requires two classes which mediate + access to system interfaces for networking and timing. As long as these two + classes provide the required public programming interfaces, it does not matter + what facilities they use underneath. In this program, they use the Linux + system libraries. + + */ + +#if defined(LINUX) + +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/param.h> +#include <sys/time.h> +#include <sys/select.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <stdio.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> + +#include <stdlib.h> +#include <string.h> +#include <signal.h> + +#include "MQTTClient.h" +#include "FP.cpp" + +#define DEFAULT_STACK_SIZE -1 + + +class IPStack +{ +public: + IPStack() + { + + } + + int Socket_error(const char* aString) + { + + if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK) + { + if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET)) + printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock); + } + return errno; + } + + int connect(const char* hostname, int port) + { + int type = SOCK_STREAM; + struct sockaddr_in address; + int rc = -1; + sa_family_t family = AF_INET; + struct addrinfo *result = NULL; + struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL}; + + if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0) + { + struct addrinfo* res = result; + + /* prefer ip4 addresses */ + while (res) + { + if (res->ai_family == AF_INET) + { + result = res; + break; + } + res = res->ai_next; + } + + if (result->ai_family == AF_INET) + { + address.sin_port = htons(port); + address.sin_family = family = AF_INET; + address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr; + } + else + rc = -1; + + freeaddrinfo(result); + } + + if (rc == 0) + { + mysock = socket(family, type, 0); + if (mysock != -1) + { + int opt = 1; + + //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0) + // printf("Could not set SO_NOSIGPIPE for socket %d", mysock); + + rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address)); + } + } + + return rc; + } + + int read(char* buffer, int len, int timeout_ms) + { + struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000}; + if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0)) + { + interval.tv_sec = 0; + interval.tv_usec = 100; + } + + setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval)); + + //printf("reading %d bytes\n", len); + int rc = ::recv(mysock, buffer, (size_t)len, 0); + if (rc == -1) + Socket_error("read"); + //printf("read %d bytes\n", rc); + return rc; + } + + int write(char* buffer, int len, int timeout) + { + struct timeval tv; + + tv.tv_sec = 0; /* 30 Secs Timeout */ + tv.tv_usec = timeout * 1000; // Not init'ing this can cause strange errors + + setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval)); + int rc = ::write(mysock, buffer, len); + //printf("write rc %d\n", rc); + return rc; + } + + int disconnect() + { + return ::close(mysock); + } + +private: + + int mysock; + +}; + + +class Countdown +{ +public: + Countdown() + { + + } + + Countdown(int ms) + { + countdown_ms(ms); + } + + + bool expired() + { + struct timeval now, res; + gettimeofday(&now, NULL); + timersub(&end_time, &now, &res); + //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000); + //if (res.tv_sec > 0 || res.tv_usec > 0) + // printf("expired %d %d\n", res.tv_sec, res.tv_usec); + return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0); + } + + + void countdown_ms(int ms) + { + struct timeval now; + gettimeofday(&now, NULL); + struct timeval interval = {ms / 1000, (ms % 1000) * 1000}; + //printf("interval %d %d\n", interval.tv_sec, interval.tv_usec); + timeradd(&now, &interval, &end_time); + } + + + void countdown(int seconds) + { + struct timeval now; + gettimeofday(&now, NULL); + struct timeval interval = {seconds, 0}; + timeradd(&now, &interval, &end_time); + } + + + int left_ms() + { + struct timeval now, res; + gettimeofday(&now, NULL); + timersub(&end_time, &now, &res); + //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000); + return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000; + } + +private: + + struct timeval end_time; +}; + + +int arrivedcount = 0; + +void messageArrived(MQTT::Message* message) +{ + printf("Message %d arrived: qos %d, retained %d, dup %d, packetid %d\n", + ++arrivedcount, message->qos, message->retained, message->dup, message->id); + printf("Payload %.*s\n", message->payloadlen, (char*)message->payload); +} + + +int connect(MQTT::Client<IPStack, Countdown>::connectionLostInfo* info) +{ + const char* hostname = "localhost"; //"m2m.eclipse.org"; + int port = 1883; + printf("Connecting to %s:%d\n", hostname, port); + int rc = info->network->connect(hostname, port); + if (rc != 0) + printf("rc from TCP connect is %d\n", rc); + + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + data.MQTTVersion = 3; + data.clientID.cstring = (char*)"mbed-icraggs"; + rc = info->client->connect(&data); + if (rc != 0) + printf("rc from MQTT connect is %d\n", rc); + + return rc; +} + + +int main(int argc, char* argv[]) +{ + IPStack ipstack = IPStack(); + float version = 0.3; + const char* topic = "mbed-sample"; + + printf("Version is %f\n", version); + + MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack); + + client.setConnectionLostHandler(connect); + + MQTT::Client<IPStack, Countdown>::connectionLostInfo info = {&client, &ipstack}; + int rc = connect(&info); + + rc = client.subscribe(topic, MQTT::QOS2, messageArrived); + if (rc != 0) + printf("rc from MQTT subscribe is %d\n", rc); + + MQTT::Message message; + + // QoS 0 + char buf[100]; + sprintf(buf, "Hello World! QoS 0 message from app version %f", version); + message.qos = MQTT::QOS0; + message.retained = false; + message.dup = false; + message.payload = (void*)buf; + message.payloadlen = strlen(buf)+1; + rc = client.publish(topic, &message); + while (arrivedcount == 0) + client.yield(100); + + // QoS 1 + printf("Now QoS 1\n"); + sprintf(buf, "Hello World! QoS 1 message from app version %f", version); + message.qos = MQTT::QOS1; + message.payloadlen = strlen(buf)+1; + rc = client.publish(topic, &message); + while (arrivedcount == 1) + client.yield(100); + + // QoS 2 + sprintf(buf, "Hello World! QoS 2 message from app version %f", version); + message.qos = MQTT::QOS2; + message.payloadlen = strlen(buf)+1; + rc = client.publish(topic, &message); + while (arrivedcount == 2) + client.yield(100); + + rc = client.unsubscribe(topic); + if (rc != 0) + printf("rc from unsubscribe was %d\n", rc); + + rc = client.disconnect(); + if (rc != 0) + printf("rc from disconnect was %d\n", rc); + + ipstack.disconnect(); + + printf("Finishing with %d messages received\n", arrivedcount); + + return 0; +} + +#endif