Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: FP MQTTPacket
Dependents: IDW01M1_Cloud_IBM IDW01M1-MQTT IDW01M1-MQTT-1 IDW01M1-MQTT3 ... more
Fork of MQTT by
Revision 7:f9d690fb6dad, committed 2014-04-09
- Comitter:
- Ian Craggs
- Date:
- Wed Apr 09 11:37:28 2014 +0100
- Parent:
- 6:4d312a49200b
- Child:
- 8:c46930bd6c82
- Commit message:
- Fixed some bugs
Changed in this revision
| MQTTClient.cpp | Show annotated file Show diff for this revision Revisions of this file |
| linux_main.cpp | Show annotated file Show diff for this revision Revisions of this file |
--- a/MQTTClient.cpp Tue Apr 08 22:54:37 2014 +0000
+++ b/MQTTClient.cpp Wed Apr 09 11:37:28 2014 +0100
@@ -21,8 +21,8 @@
{
buf = new char[buffer_size];
- readbuf = new char[buffer_size];
- this->ipstack = ipstack;
+ readbuf = new char[buffer_size];
+ buflen = readbuflen = buffer_size;
this->command_timeout = command_timeout;
//this->thread = new Thread(0); // only need a background thread for non-blocking mode
this->ipstack = network;
@@ -153,7 +153,8 @@
}
this->keepalive = options->keepAliveInterval;
- len = MQTTSerialize_connect(buf, buflen, options);
+ len = MQTTSerialize_connect(buf, buflen, options);
+ printf("len from send is %d %d\n", len, buflen);
rc = sendPacket(len); // send the connect packet
printf("rc from send is %d\n", rc);
@@ -205,4 +206,4 @@
}
return rc;
-}
\ No newline at end of file
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/linux_main.cpp Wed Apr 09 11:37:28 2014 +0100
@@ -0,0 +1,232 @@
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/param.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+
+#include <pthread.h>
+
+#include "MQTTClient.h"
+#include "MQTTClient.cpp"
+#include "FP.cpp"
+
+#define DEFAULT_STACK_SIZE -1
+
+class Thread {
+public:
+
+ Thread(void* (*fn)(void *parameter), void *parameter=NULL,
+ int priority=0,
+ uint32_t stack_size=DEFAULT_STACK_SIZE,
+ unsigned char *stack_pointer=NULL)
+ {
+ thread = 0;
+ pthread_attr_t attr;
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if (pthread_create(&thread, &attr, fn, parameter) != 0)
+ thread = 0;
+ pthread_attr_destroy(&attr);
+ }
+
+private:
+ pthread_t thread;
+};
+
+
+class IPStack
+{
+public:
+ IPStack()
+ {
+
+ }
+
+ int Socket_error(const char* aString)
+ {
+
+ if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
+ {
+ if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
+ printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
+ }
+ return errno;
+ }
+
+ int connect(const char* hostname, int port)
+ {
+ int type = SOCK_STREAM;
+ struct sockaddr_in address;
+ int rc = -1;
+ sa_family_t family = AF_INET;
+ struct addrinfo *result = NULL;
+ struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
+
+ if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
+ {
+ struct addrinfo* res = result;
+
+ /* prefer ip4 addresses */
+ while (res)
+ {
+ if (res->ai_family == AF_INET)
+ {
+ result = res;
+ break;
+ }
+ res = res->ai_next;
+ }
+
+ if (result->ai_family == AF_INET)
+ {
+ address.sin_port = htons(port);
+ address.sin_family = family = AF_INET;
+ address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
+ }
+ else
+ rc = -1;
+
+ freeaddrinfo(result);
+ }
+
+ if (rc == 0)
+ {
+ mysock = socket(family, type, 0);
+ if (mysock != -1)
+ {
+ int opt = 1;
+
+ //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
+ // printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
+
+ rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
+ }
+ }
+
+ return rc;
+ }
+
+ int read(char* buffer, int len, int timeout)
+ {
+ printf("reading %d bytes\n", len);
+ int rc = ::recv(mysock, buffer, (size_t)len, 0);
+ if (rc == -1)
+ Socket_error("read");
+ printf("read %d bytes\n", rc);
+ return rc;
+ }
+
+ int write(char* buffer, int len, int timeout)
+ {
+ return ::write(mysock, buffer, len);
+ }
+
+private:
+
+ int mysock;
+
+};
+
+
+class Timer
+{
+public:
+
+ Timer()
+ {
+ reset();
+ }
+
+ void start()
+ {
+ if (running)
+ return;
+ if (!timerisset(&stop_time))
+ gettimeofday(&start_time, NULL);
+ else
+ {
+ struct timeval now, res;
+
+ gettimeofday(&now, NULL);
+
+ timersub(&now, &stop_time, &res); // interval to be added to start time
+ timeradd(&start_time, &res, &stop_time);
+ stop_time = start_time;
+ timerclear(&stop_time);
+ }
+ running = true;
+ }
+
+ void stop()
+ {
+ if (running)
+ {
+ gettimeofday(&stop_time, NULL);
+ running = false;
+ }
+ }
+
+ void reset()
+ {
+ timerclear(&start_time);
+ timerclear(&stop_time);
+ running = false;
+ }
+
+ int read_ms() // get the time passed in milli-seconds
+ {
+ struct timeval now, res;
+
+ gettimeofday(&now, NULL);
+ timersub(&now, &start_time, &res);
+ return (res.tv_sec)*1000 + (res.tv_usec)/1000;
+ }
+
+private:
+
+ struct timeval start_time, stop_time;
+ bool running;
+
+};
+
+
+void messageArrived(MQTT::Message* message)
+{
+}
+
+
+int main(int argc, char* argv[])
+{
+ IPStack ipstack = IPStack();
+ Timer t;
+ FP<void, MQTT::Message*> messageArrivedPointer;
+
+ messageArrivedPointer.attach(messageArrived);
+
+ int rc = ipstack.connect("127.0.0.1", 1883);
+ printf("rc from TCP connect is %d\n", rc);
+
+ MQTT::Client<IPStack, Timer, Thread> client = MQTT::Client<IPStack, Timer, Thread>(&ipstack, &t);
+
+ printf("constructed\n");
+
+ rc = client.connect();
+ printf("rc from connect is %d\n", rc);
+
+ rc = client.subscribe("topic", MQTT::QOS2, messageArrivedPointer);
+ sleep(1);
+}
