fixed buffer management in case of packet fragmentation; improved test pattern with pseudo random to avoid pattern simulation
Fork of NSAPITests by
EchoServer.py
- Committer:
- mapellil
- Date:
- 2016-10-26
- Revision:
- 13:950327f445a3
- Parent:
- 12:152ae238ddc1
File content as of revision 13:950327f445a3:
#!/usr/bin/env python import socket import signal import select def main(port): port = int(port) udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) udp.bind(('', port)) udp.setblocking(0) tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp.bind(('', port)) tcp.setblocking(0) udp6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) udp6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) udp6.bind(('', port)) udp6.setblocking(0) tcp6 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) tcp6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp6.bind(('', port)) tcp6.setblocking(0) print "running on port %d" % port sockets = [tcp, udp, tcp6, udp6] clients = [] while True: select.select(sockets, [], []) try: data, addr = udp.recvfrom(1 << 12) print 'udp %s:%d %d' % (addr[0], addr[1], len(data)) udp.sendto(data, addr) except socket.error: pass try: tcp.listen(5) client, addr = tcp.accept() print 'tcp %s:%d connect' % (addr[0], addr[1]) client.setblocking(0) client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sockets.append(client) clients.append((client, addr)) except socket.error: pass try: data, addr = udp6.recvfrom(1 << 12) print 'udp6 %s:%d %d' % (addr[0], addr[1], len(data)) udp6.sendto(data, addr) except socket.error: pass try: tcp6.listen(5) client, addr = tcp6.accept() print 'tcp6 %s:%d connect' % (addr[0], addr[1]) client.setblocking(0) client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sockets.append(client) clients.append((client, addr)) except socket.error: pass for client, addr in clients: try: data = client.recv(1 << 12) if data: print 'tcp %s:%d %d' % (addr[0], addr[1], len(data)) client.send(data) else: print 'tcp %s:%d disconnect' % (addr[0], addr[1]) sockets.remove(client) clients.remove((client, addr)) client.close() except socket.error: pass if __name__ == "__main__": import sys main(*sys.argv[1:])