Sample MQTT program - simple send and receive

Dependencies:   C12832 MQTT

Dependents:   MQTT_G_SENSOR

This program and the MQTT libraries it uses are part of the EclipseTM Paho project; specifically the embedded client.

This example and API are working, but are still in progress. Please give us your feedback.

HelloMQTT is an example of using the MQTT API. The MQTT API is portable across network interface stacks. MQTT is designed to be used with TCP/IP, but any transport with similar characteristics should be suitable.

HelloMQTT uses the NetworkInterface APIs in mbed OS 5 to show how this works. The MQTT library contains an MQTTNetwork.h header, which is a wrapper around the mbed networking interface. To switch between connectivity methods (the default is Ethernet) the easy-connect library is provided in this example application. You can change the connectivity method in mbed_app.json.

Adding new connectivity methods to the program is trivial, as long as they implement the mbed OS 5 NetworkStack API.

Committer:
Jan Jongboom
Date:
Mon Sep 11 16:43:49 2017 +0200
Revision:
22:f5f6c9059eed
Parent:
8:a3e3113054a1
Roll back update to mbed OS 5.5

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 8:a3e3113054a1 1 #if !defined(LINUX_IPSTACK_H)
icraggs 8:a3e3113054a1 2 #define LINUX_IPSTACK_H
icraggs 8:a3e3113054a1 3
icraggs 8:a3e3113054a1 4 class IPStack
icraggs 8:a3e3113054a1 5 {
icraggs 8:a3e3113054a1 6 public:
icraggs 8:a3e3113054a1 7 IPStack()
icraggs 8:a3e3113054a1 8 {
icraggs 8:a3e3113054a1 9
icraggs 8:a3e3113054a1 10 }
icraggs 8:a3e3113054a1 11
icraggs 8:a3e3113054a1 12 int Socket_error(const char* aString)
icraggs 8:a3e3113054a1 13 {
icraggs 8:a3e3113054a1 14
icraggs 8:a3e3113054a1 15 if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
icraggs 8:a3e3113054a1 16 {
icraggs 8:a3e3113054a1 17 if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
icraggs 8:a3e3113054a1 18 printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
icraggs 8:a3e3113054a1 19 }
icraggs 8:a3e3113054a1 20 return errno;
icraggs 8:a3e3113054a1 21 }
icraggs 8:a3e3113054a1 22
icraggs 8:a3e3113054a1 23 int connect(const char* hostname, int port)
icraggs 8:a3e3113054a1 24 {
icraggs 8:a3e3113054a1 25 int type = SOCK_STREAM;
icraggs 8:a3e3113054a1 26 struct sockaddr_in address;
icraggs 8:a3e3113054a1 27 int rc = -1;
icraggs 8:a3e3113054a1 28 sa_family_t family = AF_INET;
icraggs 8:a3e3113054a1 29 struct addrinfo *result = NULL;
icraggs 8:a3e3113054a1 30 struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
icraggs 8:a3e3113054a1 31
icraggs 8:a3e3113054a1 32 if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
icraggs 8:a3e3113054a1 33 {
icraggs 8:a3e3113054a1 34 struct addrinfo* res = result;
icraggs 8:a3e3113054a1 35
icraggs 8:a3e3113054a1 36 /* prefer ip4 addresses */
icraggs 8:a3e3113054a1 37 while (res)
icraggs 8:a3e3113054a1 38 {
icraggs 8:a3e3113054a1 39 if (res->ai_family == AF_INET)
icraggs 8:a3e3113054a1 40 {
icraggs 8:a3e3113054a1 41 result = res;
icraggs 8:a3e3113054a1 42 break;
icraggs 8:a3e3113054a1 43 }
icraggs 8:a3e3113054a1 44 res = res->ai_next;
icraggs 8:a3e3113054a1 45 }
icraggs 8:a3e3113054a1 46
icraggs 8:a3e3113054a1 47 if (result->ai_family == AF_INET)
icraggs 8:a3e3113054a1 48 {
icraggs 8:a3e3113054a1 49 address.sin_port = htons(port);
icraggs 8:a3e3113054a1 50 address.sin_family = family = AF_INET;
icraggs 8:a3e3113054a1 51 address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
icraggs 8:a3e3113054a1 52 }
icraggs 8:a3e3113054a1 53 else
icraggs 8:a3e3113054a1 54 rc = -1;
icraggs 8:a3e3113054a1 55
icraggs 8:a3e3113054a1 56 freeaddrinfo(result);
icraggs 8:a3e3113054a1 57 }
icraggs 8:a3e3113054a1 58
icraggs 8:a3e3113054a1 59 if (rc == 0)
icraggs 8:a3e3113054a1 60 {
icraggs 8:a3e3113054a1 61 mysock = socket(family, type, 0);
icraggs 8:a3e3113054a1 62 if (mysock != -1)
icraggs 8:a3e3113054a1 63 {
icraggs 8:a3e3113054a1 64 int opt = 1;
icraggs 8:a3e3113054a1 65
icraggs 8:a3e3113054a1 66 //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
icraggs 8:a3e3113054a1 67 // printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
icraggs 8:a3e3113054a1 68
icraggs 8:a3e3113054a1 69 rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
icraggs 8:a3e3113054a1 70 }
icraggs 8:a3e3113054a1 71 }
icraggs 8:a3e3113054a1 72
icraggs 8:a3e3113054a1 73 return rc;
icraggs 8:a3e3113054a1 74 }
icraggs 8:a3e3113054a1 75
icraggs 8:a3e3113054a1 76 int read(char* buffer, int len, int timeout_ms)
icraggs 8:a3e3113054a1 77 {
icraggs 8:a3e3113054a1 78 struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
icraggs 8:a3e3113054a1 79 if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
icraggs 8:a3e3113054a1 80 {
icraggs 8:a3e3113054a1 81 interval.tv_sec = 0;
icraggs 8:a3e3113054a1 82 interval.tv_usec = 100;
icraggs 8:a3e3113054a1 83 }
icraggs 8:a3e3113054a1 84
icraggs 8:a3e3113054a1 85 setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
icraggs 8:a3e3113054a1 86
icraggs 8:a3e3113054a1 87 //printf("reading %d bytes\n", len);
icraggs 8:a3e3113054a1 88 int rc = ::recv(mysock, buffer, (size_t)len, 0);
icraggs 8:a3e3113054a1 89 if (rc == -1)
icraggs 8:a3e3113054a1 90 Socket_error("read");
icraggs 8:a3e3113054a1 91 //printf("read %d bytes\n", rc);
icraggs 8:a3e3113054a1 92 return rc;
icraggs 8:a3e3113054a1 93 }
icraggs 8:a3e3113054a1 94
icraggs 8:a3e3113054a1 95 int write(char* buffer, int len, int timeout)
icraggs 8:a3e3113054a1 96 {
icraggs 8:a3e3113054a1 97 struct timeval tv;
icraggs 8:a3e3113054a1 98
icraggs 8:a3e3113054a1 99 tv.tv_sec = 0; /* 30 Secs Timeout */
icraggs 8:a3e3113054a1 100 tv.tv_usec = timeout * 1000; // Not init'ing this can cause strange errors
icraggs 8:a3e3113054a1 101
icraggs 8:a3e3113054a1 102 setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval));
icraggs 8:a3e3113054a1 103 int rc = ::write(mysock, buffer, len);
icraggs 8:a3e3113054a1 104 //printf("write rc %d\n", rc);
icraggs 8:a3e3113054a1 105 return rc;
icraggs 8:a3e3113054a1 106 }
icraggs 8:a3e3113054a1 107
icraggs 8:a3e3113054a1 108 int disconnect()
icraggs 8:a3e3113054a1 109 {
icraggs 8:a3e3113054a1 110 return ::close(mysock);
icraggs 8:a3e3113054a1 111 }
icraggs 8:a3e3113054a1 112
icraggs 8:a3e3113054a1 113 private:
icraggs 8:a3e3113054a1 114
icraggs 8:a3e3113054a1 115 int mysock;
icraggs 8:a3e3113054a1 116
icraggs 8:a3e3113054a1 117 };
icraggs 8:a3e3113054a1 118
icraggs 8:a3e3113054a1 119 #endif