Tests for the NetworkSocketAPI
Dependents: BSDInterfaceTests ESP8266InterfaceTests LWIPInterfaceTests SpwfInterface_NSAPI_Tests ... more
NSAPITestServer.py
- Committer:
- Brian Daniels
- Date:
- 2016-03-02
- Revision:
- 3:8b595ee6219d
- Parent:
- 1:796ba8b082b1
File content as of revision 3:8b595ee6219d:
#!/usr/bin/env python import socket import signal import sys runServer = True def signal_handler(signal, frame): global runServer print "Handling interrupt" runServer = False def handle_recv(message) : if message == "{{start}}": return None else: return message def run_tcp_server(port): global runServer host = 'localhost' backlog = 5 size = 1024 tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_socket.settimeout(1.0) tcp_socket.bind((host,port)) tcp_socket.listen(backlog) while runServer: try: client, address = tcp_socket.accept() data = client.recv(size) if data: print "TCP DATA from %s: %s" % (address, data) message = handle_recv(data) if message: client.send(message) client.close() except socket.timeout: pass except IOError: pass def run_udp_server(port): global runServer host = 'localhost' backlog = 5 size = 1024 udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_socket.settimeout(1.0) udp_socket.bind((host,port)) while runServer: try: data, address = udp_socket.recvfrom(size) if data: print "UDP DATA from %s: %s" % (addr, data) message = handle_recv(data) if message: udp_socket.sendto(message, address) except socket.timeout: pass except IOError: pass def main(arguments): global thread_stop_event signal.signal(signal.SIGINT, signal_handler) socket_type = arguments[0] port = int(arguments[1]) if socket_type == "TCP": run_tcp_server(port) elif socket_type == "UDP": run_udp_server(port) else: print "Invalid socket type, must be 'TCP' or 'UDP'" sys.exit(1) sys.exit(0) if __name__ == "__main__": main(sys.argv[1:])