David Rimer / RadioHead-148
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers RH_TCP.cpp Source File

RH_TCP.cpp

00001 // RH_TCP.cpp
00002 //
00003 // Copyright (C) 2014 Mike McCauley
00004 // $Id: RH_TCP.cpp,v 1.5 2015/08/13 02:45:47 mikem Exp $
00005 
00006 #include <RadioHead.h>
00007 
00008 // This can only build on Linux and compatible systems
00009 #if (RH_PLATFORM == RH_PLATFORM_UNIX) 
00010 
00011 #include <RH_TCP.h>
00012 #include <sys/types.h>
00013 #include <errno.h>
00014 #include <sys/socket.h>
00015 #include <netinet/in.h>
00016 #include <arpa/inet.h>
00017 #include <unistd.h>
00018 #include <sys/ioctl.h>
00019 #include <netdb.h>
00020 #include <string>
00021 
00022 RH_TCP::RH_TCP(const char* server)
00023     : _server(server),
00024       _rxBufLen(0),
00025       _rxBufValid(false),
00026       _socket(-1)
00027 {
00028 }
00029     
00030 bool RH_TCP::init()
00031 {   
00032     if (!connectToServer())
00033     return false;
00034     return sendThisAddress(_thisAddress);
00035 }
00036     
00037 bool RH_TCP::connectToServer()
00038 {
00039     struct addrinfo hints;
00040     struct addrinfo *result, *rp;
00041     int sfd, s;
00042     struct sockaddr_storage peer_addr;
00043     socklen_t peer_addr_len;
00044 
00045     memset(&hints, 0, sizeof(struct addrinfo));
00046     hints.ai_family = AF_UNSPEC;    // Allow IPv4 or IPv6
00047     hints.ai_socktype = SOCK_STREAM; // Stream socket
00048     hints.ai_flags = AI_PASSIVE;    // For wildcard IP address 
00049     hints.ai_protocol = 0;          // Any protocol 
00050     hints.ai_canonname = NULL;
00051     hints.ai_addr = NULL;
00052     hints.ai_next = NULL;
00053     
00054     std::string server(_server);
00055     std::string port("4000");
00056     size_t indexOfSeparator = server.find_first_of(':');
00057     if (indexOfSeparator != std::string::npos)
00058     {
00059     port = server.substr(indexOfSeparator+1);
00060     server.erase(indexOfSeparator);
00061     }
00062 
00063     s = getaddrinfo(server.c_str(), port.c_str(), &hints, &result);
00064     if (s != 0) 
00065     {
00066     fprintf(stderr, "RH_TCP::connect getaddrinfo failed: %s\n", gai_strerror(s));
00067     return false;
00068     }
00069 
00070     // getaddrinfo() returns a list of address structures.
00071     // Try each address until we successfully connect(2).
00072     // If socket(2) (or connect(2)) fails, we (close the socket
00073     // and) try the next address. */
00074 
00075     for (rp = result; rp != NULL; rp = rp->ai_next) 
00076     {
00077     _socket = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
00078     if (_socket == -1)
00079         continue;
00080     
00081     if (connect(_socket, rp->ai_addr, rp->ai_addrlen) == 0)
00082         break;                  /* Success */
00083 
00084     close(_socket);
00085     }
00086 
00087     if (rp == NULL) 
00088     {               /* No address succeeded */
00089     fprintf(stderr, "RH_TCP::connect could not connect to %s\n", _server);
00090     return false;
00091     }
00092 
00093     freeaddrinfo(result);           /* No longer needed */
00094 
00095     // Now make the socket non-blocking
00096     int on = 1;
00097     int rc = ioctl(_socket, FIONBIO, (char *)&on);
00098     if (rc < 0)
00099     {
00100     fprintf(stderr,"RH_TCP::init failed to set socket non-blocking: %s\n", strerror(errno));
00101     close(_socket);
00102     _socket = -1;
00103     return false;
00104     }
00105     return true;
00106 }
00107 
00108 void RH_TCP::clearRxBuf()
00109 {
00110     _rxBufValid = false;
00111     _rxBufLen = 0;
00112 }
00113 
00114 void RH_TCP::checkForEvents()
00115 {
00116     #define RH_TCP_SOCKETBUF_LEN 500
00117     static uint8_t socketBuf[RH_TCP_SOCKETBUF_LEN]; // Room for several messages
00118     static uint16_t socketBufLen = 0;
00119 
00120     // Read at most the amount of space we have left in the buffer
00121     ssize_t count = read(_socket, socketBuf + socketBufLen, sizeof(socketBuf) - socketBufLen);
00122     if (count < 0)
00123     {
00124     if (errno != EAGAIN)
00125     {
00126         fprintf(stderr,"RH_TCP::checkForEvents read error: %s\n", strerror(errno));
00127         exit(1);
00128     }
00129     }
00130     else if (count == 0)
00131     {
00132     // End of file
00133     fprintf(stderr,"RH_TCP::checkForEvents unexpected end of file on read\n");
00134     exit(1);
00135     }
00136     else
00137     {
00138     socketBufLen += count;
00139     while (socketBufLen >= 5)
00140     {
00141         RHTcpTypeMessage* message = ((RHTcpTypeMessage*)socketBuf);
00142         uint32_t len = ntohl(message->length);
00143         uint32_t messageLen = len + sizeof(message->length);
00144         if (len > sizeof(socketBuf) - sizeof(message->length))
00145         {
00146         // Bogus length
00147         fprintf(stderr, "RH_TCP::checkForEvents read ridiculous length: %d. Corrupt message stream? Aborting\n", len);
00148         exit(1);
00149         }
00150         if (socketBufLen >= len + sizeof(message->length))
00151         {
00152         // Got at least all of this message
00153         if (message->type == RH_TCP_MESSAGE_TYPE_PACKET && len >= 5)
00154         {
00155             // REVISIT: need to check if we are actually receiving?
00156             // Its a new packet, extract the headers and payload
00157             RHTcpPacket* packet = ((RHTcpPacket*)socketBuf);
00158             _rxHeaderTo    = packet->to;
00159             _rxHeaderFrom  = packet->from;
00160             _rxHeaderId    = packet->id;
00161             _rxHeaderFlags = packet->flags;
00162             uint32_t payloadLen = len - 5;
00163             if (payloadLen <= sizeof(_rxBuf))
00164             {
00165             // Enough room in our receiver buffer
00166             memcpy(_rxBuf, packet->payload, payloadLen);
00167             _rxBufLen = payloadLen;
00168             _rxBufFull = true;
00169             }
00170         }
00171         // check for other message types here
00172         // Now remove the used message by copying the trailing bytes (maybe start of a new message?)
00173         // to the top of the buffer
00174         memcpy(socketBuf, socketBuf + messageLen, sizeof(socketBuf) - messageLen);
00175         socketBufLen -= messageLen;
00176         }
00177     }
00178     }
00179 }
00180 
00181 void RH_TCP::validateRxBuf()
00182 {
00183     // The headers have already been extracted
00184     if (_promiscuous ||
00185     _rxHeaderTo == _thisAddress ||
00186     _rxHeaderTo == RH_BROADCAST_ADDRESS)
00187     {
00188     _rxGood++;
00189     _rxBufValid = true;
00190     }
00191 }
00192 
00193 bool RH_TCP::available()
00194 {
00195     if (_socket < 0)
00196     return false;
00197     checkForEvents();
00198     if (_rxBufFull)
00199     {
00200     validateRxBuf();
00201     _rxBufFull= false;
00202     }
00203     return _rxBufValid;
00204 }
00205 
00206 // Block until something is available
00207 void RH_TCP::waitAvailable()
00208 {
00209     waitAvailableTimeout(0); // 0 = Wait forever
00210 }
00211 
00212 // Block until something is available or timeout expires
00213 bool RH_TCP::waitAvailableTimeout(uint16_t timeout)
00214 {
00215     int            max_fd;
00216     fd_set         input;
00217     int            result;
00218 
00219     FD_ZERO(&input);
00220     FD_SET(_socket, &input);
00221     max_fd = _socket + 1;
00222 
00223     if (timeout)
00224     {
00225     struct timeval timer;
00226     // Timeout is in milliseconds
00227     timer.tv_sec  = timeout / 1000;
00228     timer.tv_usec = (timeout % 1000) * 1000;
00229     result = select(max_fd, &input, NULL, NULL, &timer);
00230     }
00231     else
00232     {
00233     result = select(max_fd, &input, NULL, NULL, NULL);
00234     }
00235     if (result < 0)
00236     fprintf(stderr, "RH_TCP::waitAvailableTimeout: select failed %s\n", strerror(errno));
00237     return result > 0;
00238 }
00239 
00240 bool RH_TCP::recv(uint8_t* buf, uint8_t* len)
00241 {
00242     if (!available())
00243     return false;
00244 
00245     if (buf && len)
00246     {
00247     if (*len > _rxBufLen)
00248         *len = _rxBufLen;
00249     memcpy(buf, _rxBuf, *len);
00250     }
00251     clearRxBuf();
00252     return true;
00253 }
00254 
00255 bool RH_TCP::send(const uint8_t* data, uint8_t len)
00256 {
00257     bool ret = sendPacket(data, len);
00258     delay(10); // Wait for transmit to succeed. REVISIT: depends on length and speed
00259     return ret;
00260 }
00261 
00262 uint8_t RH_TCP::maxMessageLength()
00263 {
00264     return RH_TCP_MAX_MESSAGE_LEN;
00265 }
00266 
00267 void RH_TCP::setThisAddress(uint8_t address)
00268 {
00269     RHGenericDriver::setThisAddress(address);
00270     sendThisAddress(_thisAddress);
00271 }
00272 
00273 bool RH_TCP::sendThisAddress(uint8_t thisAddress)
00274 {
00275     if (_socket < 0)
00276     return false;
00277     RHTcpThisAddress m;
00278     m.length = htonl(2);
00279     m.type = RH_TCP_MESSAGE_TYPE_THISADDRESS;
00280     m.thisAddress = thisAddress;
00281     ssize_t sent = write(_socket, &m, sizeof(m));
00282     return sent > 0;
00283 }
00284 
00285 bool RH_TCP::sendPacket(const uint8_t* data, uint8_t len)
00286 {
00287     if (_socket < 0)
00288     return false;
00289     RHTcpPacket m;
00290     m.length = htonl(len + 4);
00291     m.type  = RH_TCP_MESSAGE_TYPE_PACKET;
00292     m.to    = _txHeaderTo;
00293     m.from  = _txHeaderFrom;
00294     m.id    = _txHeaderId;
00295     m.flags = _txHeaderFlags;
00296     memcpy(m.payload, data, len);
00297     ssize_t sent = write(_socket, &m, len + 8);
00298     return sent > 0;
00299 }
00300 
00301 #endif