DDD
Dependencies: HW4 FP MQTTPacket
Fork of MQTT by
Revision 7:f9d690fb6dad, committed 2014-04-09
- Comitter:
- Ian Craggs
- Date:
- Wed Apr 09 11:37:28 2014 +0100
- Parent:
- 6:4d312a49200b
- Child:
- 8:c46930bd6c82
- Commit message:
- Fixed some bugs
Changed in this revision
MQTTClient.cpp | Show annotated file Show diff for this revision Revisions of this file |
linux_main.cpp | Show annotated file Show diff for this revision Revisions of this file |
--- a/MQTTClient.cpp Tue Apr 08 22:54:37 2014 +0000 +++ b/MQTTClient.cpp Wed Apr 09 11:37:28 2014 +0100 @@ -21,8 +21,8 @@ { buf = new char[buffer_size]; - readbuf = new char[buffer_size]; - this->ipstack = ipstack; + readbuf = new char[buffer_size]; + buflen = readbuflen = buffer_size; this->command_timeout = command_timeout; //this->thread = new Thread(0); // only need a background thread for non-blocking mode this->ipstack = network; @@ -153,7 +153,8 @@ } this->keepalive = options->keepAliveInterval; - len = MQTTSerialize_connect(buf, buflen, options); + len = MQTTSerialize_connect(buf, buflen, options); + printf("len from send is %d %d\n", len, buflen); rc = sendPacket(len); // send the connect packet printf("rc from send is %d\n", rc); @@ -205,4 +206,4 @@ } return rc; -} \ No newline at end of file +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/linux_main.cpp Wed Apr 09 11:37:28 2014 +0100 @@ -0,0 +1,232 @@ + +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/param.h> +#include <sys/time.h> +#include <sys/select.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <stdio.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> + +#include <stdlib.h> +#include <string.h> +#include <signal.h> + +#include <pthread.h> + +#include "MQTTClient.h" +#include "MQTTClient.cpp" +#include "FP.cpp" + +#define DEFAULT_STACK_SIZE -1 + +class Thread { +public: + + Thread(void* (*fn)(void *parameter), void *parameter=NULL, + int priority=0, + uint32_t stack_size=DEFAULT_STACK_SIZE, + unsigned char *stack_pointer=NULL) + { + thread = 0; + pthread_attr_t attr; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (pthread_create(&thread, &attr, fn, parameter) != 0) + thread = 0; + pthread_attr_destroy(&attr); + } + +private: + pthread_t thread; +}; + + +class IPStack +{ +public: + IPStack() + { + + } + + int Socket_error(const char* aString) + { + + if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK) + { + if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET)) + printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock); + } + return errno; + } + + int connect(const char* hostname, int port) + { + int type = SOCK_STREAM; + struct sockaddr_in address; + int rc = -1; + sa_family_t family = AF_INET; + struct addrinfo *result = NULL; + struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL}; + + if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0) + { + struct addrinfo* res = result; + + /* prefer ip4 addresses */ + while (res) + { + if (res->ai_family == AF_INET) + { + result = res; + break; + } + res = res->ai_next; + } + + if (result->ai_family == AF_INET) + { + address.sin_port = htons(port); + address.sin_family = family = AF_INET; + address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr; + } + else + rc = -1; + + freeaddrinfo(result); + } + + if (rc == 0) + { + mysock = socket(family, type, 0); + if (mysock != -1) + { + int opt = 1; + + //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0) + // printf("Could not set SO_NOSIGPIPE for socket %d", mysock); + + rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address)); + } + } + + return rc; + } + + int read(char* buffer, int len, int timeout) + { + printf("reading %d bytes\n", len); + int rc = ::recv(mysock, buffer, (size_t)len, 0); + if (rc == -1) + Socket_error("read"); + printf("read %d bytes\n", rc); + return rc; + } + + int write(char* buffer, int len, int timeout) + { + return ::write(mysock, buffer, len); + } + +private: + + int mysock; + +}; + + +class Timer +{ +public: + + Timer() + { + reset(); + } + + void start() + { + if (running) + return; + if (!timerisset(&stop_time)) + gettimeofday(&start_time, NULL); + else + { + struct timeval now, res; + + gettimeofday(&now, NULL); + + timersub(&now, &stop_time, &res); // interval to be added to start time + timeradd(&start_time, &res, &stop_time); + stop_time = start_time; + timerclear(&stop_time); + } + running = true; + } + + void stop() + { + if (running) + { + gettimeofday(&stop_time, NULL); + running = false; + } + } + + void reset() + { + timerclear(&start_time); + timerclear(&stop_time); + running = false; + } + + int read_ms() // get the time passed in milli-seconds + { + struct timeval now, res; + + gettimeofday(&now, NULL); + timersub(&now, &start_time, &res); + return (res.tv_sec)*1000 + (res.tv_usec)/1000; + } + +private: + + struct timeval start_time, stop_time; + bool running; + +}; + + +void messageArrived(MQTT::Message* message) +{ +} + + +int main(int argc, char* argv[]) +{ + IPStack ipstack = IPStack(); + Timer t; + FP<void, MQTT::Message*> messageArrivedPointer; + + messageArrivedPointer.attach(messageArrived); + + int rc = ipstack.connect("127.0.0.1", 1883); + printf("rc from TCP connect is %d\n", rc); + + MQTT::Client<IPStack, Timer, Thread> client = MQTT::Client<IPStack, Timer, Thread>(&ipstack, &t); + + printf("constructed\n"); + + rc = client.connect(); + printf("rc from connect is %d\n", rc); + + rc = client.subscribe("topic", MQTT::QOS2, messageArrivedPointer); + sleep(1); +}