Tests for the NetworkSocketAPI

Dependents:   BSDInterfaceTests ESP8266InterfaceTests LWIPInterfaceTests SpwfInterface_NSAPI_Tests ... more

NSAPITestServer.py

Committer:
Brian Daniels
Date:
2016-03-02
Revision:
3:8b595ee6219d
Parent:
1:796ba8b082b1

File content as of revision 3:8b595ee6219d:

#!/usr/bin/env python
import socket
import signal
import sys

runServer = True

def signal_handler(signal, frame):
    global runServer

    print "Handling interrupt"
    runServer = False

def handle_recv(message) :
    if message == "{{start}}":
        return None
    else:
        return message


def run_tcp_server(port):
    global runServer

    host = 'localhost'
    backlog = 5
    size = 1024
    tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp_socket.settimeout(1.0)
    tcp_socket.bind((host,port))
    tcp_socket.listen(backlog)

    while runServer:
        try:
            client, address = tcp_socket.accept()
            data = client.recv(size)
            if data:
                print "TCP DATA from %s: %s" % (address, data)
                message = handle_recv(data)
                if message:
                    client.send(message)
                    client.close()
        except socket.timeout:
            pass
        except IOError:
            pass


def run_udp_server(port):
    global runServer

    host = 'localhost'
    backlog = 5
    size = 1024
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.settimeout(1.0)
    udp_socket.bind((host,port))


    while runServer:
        try:
            data, address = udp_socket.recvfrom(size)
            if data:
                print "UDP DATA from %s: %s" % (addr, data)
                message = handle_recv(data)
                if message:
                    udp_socket.sendto(message, address)
        except socket.timeout:
            pass
        except IOError:
            pass


def main(arguments):
    global thread_stop_event

    signal.signal(signal.SIGINT, signal_handler)

    socket_type = arguments[0]
    port = int(arguments[1])

    if socket_type == "TCP":
        run_tcp_server(port)
    elif socket_type == "UDP":
        run_udp_server(port)
    else:
        print "Invalid socket type, must be 'TCP' or 'UDP'"
        sys.exit(1)

    sys.exit(0)

if __name__ == "__main__":
    main(sys.argv[1:])