Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
Revision 7:f9d690fb6dad, committed 2014-04-09
- Comitter:
- Ian Craggs
- Date:
- Wed Apr 09 11:37:28 2014 +0100
- Parent:
- 6:4d312a49200b
- Child:
- 8:c46930bd6c82
- Commit message:
- Fixed some bugs
Changed in this revision
| MQTTClient.cpp | Show annotated file Show diff for this revision Revisions of this file |
| linux_main.cpp | Show annotated file Show diff for this revision Revisions of this file |
--- a/MQTTClient.cpp Tue Apr 08 22:54:37 2014 +0000
+++ b/MQTTClient.cpp Wed Apr 09 11:37:28 2014 +0100
@@ -21,8 +21,8 @@
{
buf = new char[buffer_size];
- readbuf = new char[buffer_size];
- this->ipstack = ipstack;
+ readbuf = new char[buffer_size];
+ buflen = readbuflen = buffer_size;
this->command_timeout = command_timeout;
//this->thread = new Thread(0); // only need a background thread for non-blocking mode
this->ipstack = network;
@@ -153,7 +153,8 @@
}
this->keepalive = options->keepAliveInterval;
- len = MQTTSerialize_connect(buf, buflen, options);
+ len = MQTTSerialize_connect(buf, buflen, options);
+ printf("len from send is %d %d\n", len, buflen);
rc = sendPacket(len); // send the connect packet
printf("rc from send is %d\n", rc);
@@ -205,4 +206,4 @@
}
return rc;
-}
\ No newline at end of file
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/linux_main.cpp Wed Apr 09 11:37:28 2014 +0100
@@ -0,0 +1,232 @@
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/param.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+
+#include <pthread.h>
+
+#include "MQTTClient.h"
+#include "MQTTClient.cpp"
+#include "FP.cpp"
+
+#define DEFAULT_STACK_SIZE -1
+
+class Thread {
+public:
+
+ Thread(void* (*fn)(void *parameter), void *parameter=NULL,
+ int priority=0,
+ uint32_t stack_size=DEFAULT_STACK_SIZE,
+ unsigned char *stack_pointer=NULL)
+ {
+ thread = 0;
+ pthread_attr_t attr;
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if (pthread_create(&thread, &attr, fn, parameter) != 0)
+ thread = 0;
+ pthread_attr_destroy(&attr);
+ }
+
+private:
+ pthread_t thread;
+};
+
+
+class IPStack
+{
+public:
+ IPStack()
+ {
+
+ }
+
+ int Socket_error(const char* aString)
+ {
+
+ if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
+ {
+ if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
+ printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
+ }
+ return errno;
+ }
+
+ int connect(const char* hostname, int port)
+ {
+ int type = SOCK_STREAM;
+ struct sockaddr_in address;
+ int rc = -1;
+ sa_family_t family = AF_INET;
+ struct addrinfo *result = NULL;
+ struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
+
+ if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
+ {
+ struct addrinfo* res = result;
+
+ /* prefer ip4 addresses */
+ while (res)
+ {
+ if (res->ai_family == AF_INET)
+ {
+ result = res;
+ break;
+ }
+ res = res->ai_next;
+ }
+
+ if (result->ai_family == AF_INET)
+ {
+ address.sin_port = htons(port);
+ address.sin_family = family = AF_INET;
+ address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
+ }
+ else
+ rc = -1;
+
+ freeaddrinfo(result);
+ }
+
+ if (rc == 0)
+ {
+ mysock = socket(family, type, 0);
+ if (mysock != -1)
+ {
+ int opt = 1;
+
+ //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
+ // printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
+
+ rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
+ }
+ }
+
+ return rc;
+ }
+
+ int read(char* buffer, int len, int timeout)
+ {
+ printf("reading %d bytes\n", len);
+ int rc = ::recv(mysock, buffer, (size_t)len, 0);
+ if (rc == -1)
+ Socket_error("read");
+ printf("read %d bytes\n", rc);
+ return rc;
+ }
+
+ int write(char* buffer, int len, int timeout)
+ {
+ return ::write(mysock, buffer, len);
+ }
+
+private:
+
+ int mysock;
+
+};
+
+
+class Timer
+{
+public:
+
+ Timer()
+ {
+ reset();
+ }
+
+ void start()
+ {
+ if (running)
+ return;
+ if (!timerisset(&stop_time))
+ gettimeofday(&start_time, NULL);
+ else
+ {
+ struct timeval now, res;
+
+ gettimeofday(&now, NULL);
+
+ timersub(&now, &stop_time, &res); // interval to be added to start time
+ timeradd(&start_time, &res, &stop_time);
+ stop_time = start_time;
+ timerclear(&stop_time);
+ }
+ running = true;
+ }
+
+ void stop()
+ {
+ if (running)
+ {
+ gettimeofday(&stop_time, NULL);
+ running = false;
+ }
+ }
+
+ void reset()
+ {
+ timerclear(&start_time);
+ timerclear(&stop_time);
+ running = false;
+ }
+
+ int read_ms() // get the time passed in milli-seconds
+ {
+ struct timeval now, res;
+
+ gettimeofday(&now, NULL);
+ timersub(&now, &start_time, &res);
+ return (res.tv_sec)*1000 + (res.tv_usec)/1000;
+ }
+
+private:
+
+ struct timeval start_time, stop_time;
+ bool running;
+
+};
+
+
+void messageArrived(MQTT::Message* message)
+{
+}
+
+
+int main(int argc, char* argv[])
+{
+ IPStack ipstack = IPStack();
+ Timer t;
+ FP<void, MQTT::Message*> messageArrivedPointer;
+
+ messageArrivedPointer.attach(messageArrived);
+
+ int rc = ipstack.connect("127.0.0.1", 1883);
+ printf("rc from TCP connect is %d\n", rc);
+
+ MQTT::Client<IPStack, Timer, Thread> client = MQTT::Client<IPStack, Timer, Thread>(&ipstack, &t);
+
+ printf("constructed\n");
+
+ rc = client.connect();
+ printf("rc from connect is %d\n", rc);
+
+ rc = client.subscribe("topic", MQTT::QOS2, messageArrivedPointer);
+ sleep(1);
+}
