Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of NSAPITests by
Diff: NSAPITestServer.py
- Revision:
- 3:8b595ee6219d
- Parent:
- 1:796ba8b082b1
diff -r 41bf867fedd2 -r 8b595ee6219d NSAPITestServer.py
--- a/NSAPITestServer.py Wed Mar 02 13:24:46 2016 -0600
+++ b/NSAPITestServer.py Wed Mar 02 16:16:25 2016 -0600
@@ -1,8 +1,92 @@
#!/usr/bin/env python
+import socket
+import signal
+import sys
+
+runServer = True
+
+def signal_handler(signal, frame):
+ global runServer
+
+ print "Handling interrupt"
+ runServer = False
+
+def handle_recv(message) :
+ if message == "{{start}}":
+ return None
+ else:
+ return message
+
+
+def run_tcp_server(port):
+ global runServer
+
+ host = 'localhost'
+ backlog = 5
+ size = 1024
+ tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ tcp_socket.settimeout(1.0)
+ tcp_socket.bind((host,port))
+ tcp_socket.listen(backlog)
+
+ while runServer:
+ try:
+ client, address = tcp_socket.accept()
+ data = client.recv(size)
+ if data:
+ print "TCP DATA from %s: %s" % (address, data)
+ message = handle_recv(data)
+ if message:
+ client.send(message)
+ client.close()
+ except socket.timeout:
+ pass
+ except IOError:
+ pass
-def main():
- pass
+
+def run_udp_server(port):
+ global runServer
+
+ host = 'localhost'
+ backlog = 5
+ size = 1024
+ udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ udp_socket.settimeout(1.0)
+ udp_socket.bind((host,port))
+
-if __name__ != "__main__":
- import sys
- main(*sys.argv[1:])
+ while runServer:
+ try:
+ data, address = udp_socket.recvfrom(size)
+ if data:
+ print "UDP DATA from %s: %s" % (addr, data)
+ message = handle_recv(data)
+ if message:
+ udp_socket.sendto(message, address)
+ except socket.timeout:
+ pass
+ except IOError:
+ pass
+
+
+def main(arguments):
+ global thread_stop_event
+
+ signal.signal(signal.SIGINT, signal_handler)
+
+ socket_type = arguments[0]
+ port = int(arguments[1])
+
+ if socket_type == "TCP":
+ run_tcp_server(port)
+ elif socket_type == "UDP":
+ run_udp_server(port)
+ else:
+ print "Invalid socket type, must be 'TCP' or 'UDP'"
+ sys.exit(1)
+
+ sys.exit(0)
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
\ No newline at end of file
