HelloMQTT porting to FRDM-K64F+AppShield

Dependencies:   C12832 EthernetInterface MQTT mbed-rtos mbed

Fork of HelloMQTT by MQTT

Tested with RabbitMQ 3.4.4 on OSX.

modification are below

- skip QOS2 test since RabbitMQ is not supported QOS2 message type

Committer:
hogejun
Date:
Mon Apr 13 09:41:48 2015 +0000
Revision:
17:e3aa8f5ee6ed
Parent:
8:a3e3113054a1
initial revision

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 8:a3e3113054a1 1 #if !defined(LINUX_IPSTACK_H)
icraggs 8:a3e3113054a1 2 #define LINUX_IPSTACK_H
icraggs 8:a3e3113054a1 3
icraggs 8:a3e3113054a1 4 class IPStack
icraggs 8:a3e3113054a1 5 {
icraggs 8:a3e3113054a1 6 public:
icraggs 8:a3e3113054a1 7 IPStack()
icraggs 8:a3e3113054a1 8 {
icraggs 8:a3e3113054a1 9
icraggs 8:a3e3113054a1 10 }
icraggs 8:a3e3113054a1 11
icraggs 8:a3e3113054a1 12 int Socket_error(const char* aString)
icraggs 8:a3e3113054a1 13 {
icraggs 8:a3e3113054a1 14
icraggs 8:a3e3113054a1 15 if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
icraggs 8:a3e3113054a1 16 {
icraggs 8:a3e3113054a1 17 if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
icraggs 8:a3e3113054a1 18 printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
icraggs 8:a3e3113054a1 19 }
icraggs 8:a3e3113054a1 20 return errno;
icraggs 8:a3e3113054a1 21 }
icraggs 8:a3e3113054a1 22
icraggs 8:a3e3113054a1 23 int connect(const char* hostname, int port)
icraggs 8:a3e3113054a1 24 {
icraggs 8:a3e3113054a1 25 int type = SOCK_STREAM;
icraggs 8:a3e3113054a1 26 struct sockaddr_in address;
icraggs 8:a3e3113054a1 27 int rc = -1;
icraggs 8:a3e3113054a1 28 sa_family_t family = AF_INET;
icraggs 8:a3e3113054a1 29 struct addrinfo *result = NULL;
icraggs 8:a3e3113054a1 30 struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
icraggs 8:a3e3113054a1 31
icraggs 8:a3e3113054a1 32 if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
icraggs 8:a3e3113054a1 33 {
icraggs 8:a3e3113054a1 34 struct addrinfo* res = result;
icraggs 8:a3e3113054a1 35
icraggs 8:a3e3113054a1 36 /* prefer ip4 addresses */
icraggs 8:a3e3113054a1 37 while (res)
icraggs 8:a3e3113054a1 38 {
icraggs 8:a3e3113054a1 39 if (res->ai_family == AF_INET)
icraggs 8:a3e3113054a1 40 {
icraggs 8:a3e3113054a1 41 result = res;
icraggs 8:a3e3113054a1 42 break;
icraggs 8:a3e3113054a1 43 }
icraggs 8:a3e3113054a1 44 res = res->ai_next;
icraggs 8:a3e3113054a1 45 }
icraggs 8:a3e3113054a1 46
icraggs 8:a3e3113054a1 47 if (result->ai_family == AF_INET)
icraggs 8:a3e3113054a1 48 {
icraggs 8:a3e3113054a1 49 address.sin_port = htons(port);
icraggs 8:a3e3113054a1 50 address.sin_family = family = AF_INET;
icraggs 8:a3e3113054a1 51 address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
icraggs 8:a3e3113054a1 52 }
icraggs 8:a3e3113054a1 53 else
icraggs 8:a3e3113054a1 54 rc = -1;
icraggs 8:a3e3113054a1 55
icraggs 8:a3e3113054a1 56 freeaddrinfo(result);
icraggs 8:a3e3113054a1 57 }
icraggs 8:a3e3113054a1 58
icraggs 8:a3e3113054a1 59 if (rc == 0)
icraggs 8:a3e3113054a1 60 {
icraggs 8:a3e3113054a1 61 mysock = socket(family, type, 0);
icraggs 8:a3e3113054a1 62 if (mysock != -1)
icraggs 8:a3e3113054a1 63 {
icraggs 8:a3e3113054a1 64 int opt = 1;
icraggs 8:a3e3113054a1 65
icraggs 8:a3e3113054a1 66 //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
icraggs 8:a3e3113054a1 67 // printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
icraggs 8:a3e3113054a1 68
icraggs 8:a3e3113054a1 69 rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
icraggs 8:a3e3113054a1 70 }
icraggs 8:a3e3113054a1 71 }
icraggs 8:a3e3113054a1 72
icraggs 8:a3e3113054a1 73 return rc;
icraggs 8:a3e3113054a1 74 }
icraggs 8:a3e3113054a1 75
icraggs 8:a3e3113054a1 76 int read(char* buffer, int len, int timeout_ms)
icraggs 8:a3e3113054a1 77 {
icraggs 8:a3e3113054a1 78 struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
icraggs 8:a3e3113054a1 79 if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
icraggs 8:a3e3113054a1 80 {
icraggs 8:a3e3113054a1 81 interval.tv_sec = 0;
icraggs 8:a3e3113054a1 82 interval.tv_usec = 100;
icraggs 8:a3e3113054a1 83 }
icraggs 8:a3e3113054a1 84
icraggs 8:a3e3113054a1 85 setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
icraggs 8:a3e3113054a1 86
icraggs 8:a3e3113054a1 87 //printf("reading %d bytes\n", len);
icraggs 8:a3e3113054a1 88 int rc = ::recv(mysock, buffer, (size_t)len, 0);
icraggs 8:a3e3113054a1 89 if (rc == -1)
icraggs 8:a3e3113054a1 90 Socket_error("read");
icraggs 8:a3e3113054a1 91 //printf("read %d bytes\n", rc);
icraggs 8:a3e3113054a1 92 return rc;
icraggs 8:a3e3113054a1 93 }
icraggs 8:a3e3113054a1 94
icraggs 8:a3e3113054a1 95 int write(char* buffer, int len, int timeout)
icraggs 8:a3e3113054a1 96 {
icraggs 8:a3e3113054a1 97 struct timeval tv;
icraggs 8:a3e3113054a1 98
icraggs 8:a3e3113054a1 99 tv.tv_sec = 0; /* 30 Secs Timeout */
icraggs 8:a3e3113054a1 100 tv.tv_usec = timeout * 1000; // Not init'ing this can cause strange errors
icraggs 8:a3e3113054a1 101
icraggs 8:a3e3113054a1 102 setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval));
icraggs 8:a3e3113054a1 103 int rc = ::write(mysock, buffer, len);
icraggs 8:a3e3113054a1 104 //printf("write rc %d\n", rc);
icraggs 8:a3e3113054a1 105 return rc;
icraggs 8:a3e3113054a1 106 }
icraggs 8:a3e3113054a1 107
icraggs 8:a3e3113054a1 108 int disconnect()
icraggs 8:a3e3113054a1 109 {
icraggs 8:a3e3113054a1 110 return ::close(mysock);
icraggs 8:a3e3113054a1 111 }
icraggs 8:a3e3113054a1 112
icraggs 8:a3e3113054a1 113 private:
icraggs 8:a3e3113054a1 114
icraggs 8:a3e3113054a1 115 int mysock;
icraggs 8:a3e3113054a1 116
icraggs 8:a3e3113054a1 117 };
icraggs 8:a3e3113054a1 118
icraggs 8:a3e3113054a1 119 #endif