V148

Fork of RadioHead-148 by David Rimer

Revision:
0:ab4e012489ef
diff -r 000000000000 -r ab4e012489ef RH_TCP.cpp
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/RH_TCP.cpp	Thu Oct 15 01:27:00 2015 +0000
@@ -0,0 +1,301 @@
+// RH_TCP.cpp
+//
+// Copyright (C) 2014 Mike McCauley
+// $Id: RH_TCP.cpp,v 1.5 2015/08/13 02:45:47 mikem Exp $
+
+#include <RadioHead.h>
+
+// This can only build on Linux and compatible systems
+#if (RH_PLATFORM == RH_PLATFORM_UNIX) 
+
+#include <RH_TCP.h>
+#include <sys/types.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <sys/ioctl.h>
+#include <netdb.h>
+#include <string>
+
+RH_TCP::RH_TCP(const char* server)
+    : _server(server),
+      _rxBufLen(0),
+      _rxBufValid(false),
+      _socket(-1)
+{
+}
+    
+bool RH_TCP::init()
+{   
+    if (!connectToServer())
+	return false;
+    return sendThisAddress(_thisAddress);
+}
+    
+bool RH_TCP::connectToServer()
+{
+    struct addrinfo hints;
+    struct addrinfo *result, *rp;
+    int sfd, s;
+    struct sockaddr_storage peer_addr;
+    socklen_t peer_addr_len;
+
+    memset(&hints, 0, sizeof(struct addrinfo));
+    hints.ai_family = AF_UNSPEC;    // Allow IPv4 or IPv6
+    hints.ai_socktype = SOCK_STREAM; // Stream socket
+    hints.ai_flags = AI_PASSIVE;    // For wildcard IP address 
+    hints.ai_protocol = 0;          // Any protocol 
+    hints.ai_canonname = NULL;
+    hints.ai_addr = NULL;
+    hints.ai_next = NULL;
+    
+    std::string server(_server);
+    std::string port("4000");
+    size_t indexOfSeparator = server.find_first_of(':');
+    if (indexOfSeparator != std::string::npos)
+    {
+	port = server.substr(indexOfSeparator+1);
+	server.erase(indexOfSeparator);
+    }
+
+    s = getaddrinfo(server.c_str(), port.c_str(), &hints, &result);
+    if (s != 0) 
+    {
+	fprintf(stderr, "RH_TCP::connect getaddrinfo failed: %s\n", gai_strerror(s));
+	return false;
+    }
+
+    // getaddrinfo() returns a list of address structures.
+    // Try each address until we successfully connect(2).
+    // If socket(2) (or connect(2)) fails, we (close the socket
+    // and) try the next address. */
+
+    for (rp = result; rp != NULL; rp = rp->ai_next) 
+    {
+	_socket = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+	if (_socket == -1)
+	    continue;
+	
+	if (connect(_socket, rp->ai_addr, rp->ai_addrlen) == 0)
+	    break;                  /* Success */
+
+	close(_socket);
+    }
+
+    if (rp == NULL) 
+    {               /* No address succeeded */
+	fprintf(stderr, "RH_TCP::connect could not connect to %s\n", _server);
+	return false;
+    }
+
+    freeaddrinfo(result);           /* No longer needed */
+
+    // Now make the socket non-blocking
+    int on = 1;
+    int rc = ioctl(_socket, FIONBIO, (char *)&on);
+    if (rc < 0)
+    {
+	fprintf(stderr,"RH_TCP::init failed to set socket non-blocking: %s\n", strerror(errno));
+	close(_socket);
+	_socket = -1;
+	return false;
+    }
+    return true;
+}
+
+void RH_TCP::clearRxBuf()
+{
+    _rxBufValid = false;
+    _rxBufLen = 0;
+}
+
+void RH_TCP::checkForEvents()
+{
+    #define RH_TCP_SOCKETBUF_LEN 500
+    static uint8_t socketBuf[RH_TCP_SOCKETBUF_LEN]; // Room for several messages
+    static uint16_t socketBufLen = 0;
+
+    // Read at most the amount of space we have left in the buffer
+    ssize_t count = read(_socket, socketBuf + socketBufLen, sizeof(socketBuf) - socketBufLen);
+    if (count < 0)
+    {
+	if (errno != EAGAIN)
+	{
+	    fprintf(stderr,"RH_TCP::checkForEvents read error: %s\n", strerror(errno));
+	    exit(1);
+	}
+    }
+    else if (count == 0)
+    {
+	// End of file
+	fprintf(stderr,"RH_TCP::checkForEvents unexpected end of file on read\n");
+	exit(1);
+    }
+    else
+    {
+	socketBufLen += count;
+	while (socketBufLen >= 5)
+	{
+	    RHTcpTypeMessage* message = ((RHTcpTypeMessage*)socketBuf);
+	    uint32_t len = ntohl(message->length);
+	    uint32_t messageLen = len + sizeof(message->length);
+	    if (len > sizeof(socketBuf) - sizeof(message->length))
+	    {
+		// Bogus length
+		fprintf(stderr, "RH_TCP::checkForEvents read ridiculous length: %d. Corrupt message stream? Aborting\n", len);
+		exit(1);
+	    }
+	    if (socketBufLen >= len + sizeof(message->length))
+	    {
+		// Got at least all of this message
+		if (message->type == RH_TCP_MESSAGE_TYPE_PACKET && len >= 5)
+		{
+		    // REVISIT: need to check if we are actually receiving?
+		    // Its a new packet, extract the headers and payload
+		    RHTcpPacket* packet = ((RHTcpPacket*)socketBuf);
+		    _rxHeaderTo    = packet->to;
+		    _rxHeaderFrom  = packet->from;
+		    _rxHeaderId    = packet->id;
+		    _rxHeaderFlags = packet->flags;
+		    uint32_t payloadLen = len - 5;
+		    if (payloadLen <= sizeof(_rxBuf))
+		    {
+			// Enough room in our receiver buffer
+			memcpy(_rxBuf, packet->payload, payloadLen);
+			_rxBufLen = payloadLen;
+			_rxBufFull = true;
+		    }
+		}
+		// check for other message types here
+		// Now remove the used message by copying the trailing bytes (maybe start of a new message?)
+		// to the top of the buffer
+		memcpy(socketBuf, socketBuf + messageLen, sizeof(socketBuf) - messageLen);
+		socketBufLen -= messageLen;
+	    }
+	}
+    }
+}
+
+void RH_TCP::validateRxBuf()
+{
+    // The headers have already been extracted
+    if (_promiscuous ||
+	_rxHeaderTo == _thisAddress ||
+	_rxHeaderTo == RH_BROADCAST_ADDRESS)
+    {
+	_rxGood++;
+	_rxBufValid = true;
+    }
+}
+
+bool RH_TCP::available()
+{
+    if (_socket < 0)
+	return false;
+    checkForEvents();
+    if (_rxBufFull)
+    {
+	validateRxBuf();
+	_rxBufFull= false;
+    }
+    return _rxBufValid;
+}
+
+// Block until something is available
+void RH_TCP::waitAvailable()
+{
+    waitAvailableTimeout(0); // 0 = Wait forever
+}
+
+// Block until something is available or timeout expires
+bool RH_TCP::waitAvailableTimeout(uint16_t timeout)
+{
+    int            max_fd;
+    fd_set         input;
+    int            result;
+
+    FD_ZERO(&input);
+    FD_SET(_socket, &input);
+    max_fd = _socket + 1;
+
+    if (timeout)
+    {
+	struct timeval timer;
+	// Timeout is in milliseconds
+	timer.tv_sec  = timeout / 1000;
+	timer.tv_usec = (timeout % 1000) * 1000;
+	result = select(max_fd, &input, NULL, NULL, &timer);
+    }
+    else
+    {
+	result = select(max_fd, &input, NULL, NULL, NULL);
+    }
+    if (result < 0)
+	fprintf(stderr, "RH_TCP::waitAvailableTimeout: select failed %s\n", strerror(errno));
+    return result > 0;
+}
+
+bool RH_TCP::recv(uint8_t* buf, uint8_t* len)
+{
+    if (!available())
+	return false;
+
+    if (buf && len)
+    {
+	if (*len > _rxBufLen)
+	    *len = _rxBufLen;
+	memcpy(buf, _rxBuf, *len);
+    }
+    clearRxBuf();
+    return true;
+}
+
+bool RH_TCP::send(const uint8_t* data, uint8_t len)
+{
+    bool ret = sendPacket(data, len);
+    delay(10); // Wait for transmit to succeed. REVISIT: depends on length and speed
+    return ret;
+}
+
+uint8_t RH_TCP::maxMessageLength()
+{
+    return RH_TCP_MAX_MESSAGE_LEN;
+}
+
+void RH_TCP::setThisAddress(uint8_t address)
+{
+    RHGenericDriver::setThisAddress(address);
+    sendThisAddress(_thisAddress);
+}
+
+bool RH_TCP::sendThisAddress(uint8_t thisAddress)
+{
+    if (_socket < 0)
+	return false;
+    RHTcpThisAddress m;
+    m.length = htonl(2);
+    m.type = RH_TCP_MESSAGE_TYPE_THISADDRESS;
+    m.thisAddress = thisAddress;
+    ssize_t sent = write(_socket, &m, sizeof(m));
+    return sent > 0;
+}
+
+bool RH_TCP::sendPacket(const uint8_t* data, uint8_t len)
+{
+    if (_socket < 0)
+	return false;
+    RHTcpPacket m;
+    m.length = htonl(len + 4);
+    m.type  = RH_TCP_MESSAGE_TYPE_PACKET;
+    m.to    = _txHeaderTo;
+    m.from  = _txHeaderFrom;
+    m.id    = _txHeaderId;
+    m.flags = _txHeaderFlags;
+    memcpy(m.payload, data, len);
+    ssize_t sent = write(_socket, &m, len + 8);
+    return sent > 0;
+}
+
+#endif