ON Semiconductor / mbed-os

Dependents:   mbed-TFT-example-NCS36510 mbed-Accelerometer-example-NCS36510 mbed-Accelerometer-example-NCS36510

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers udp_link_layer_auto.py Source File

udp_link_layer_auto.py

00001 """
00002 mbed SDK
00003 Copyright (c) 2011-2013 ARM Limited
00004 
00005 Licensed under the Apache License, Version 2.0 (the "License");
00006 you may not use this file except in compliance with the License.
00007 You may obtain a copy of the License at
00008 
00009     http://www.apache.org/licenses/LICENSE-2.0
00010 
00011 Unless required by applicable law or agreed to in writing, software
00012 distributed under the License is distributed on an "AS IS" BASIS,
00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 See the License for the specific language governing permissions and
00015 limitations under the License.
00016 """
00017 
00018 """
00019 How to use:
00020 make.py -m LPC1768 -t ARM -d E:\ -n NET_14
00021 udp_link_layer_auto.py -p COM20 -d E:\ -t 10
00022 """
00023 
00024 import re
00025 import uuid
00026 import socket
00027 import thread
00028 from sys import stdout
00029 from time import time, sleep
00030 from host_test import DefaultTest
00031 from SocketServer import BaseRequestHandler, UDPServer
00032 
00033 
00034 # Received datagrams (with time)
00035 dict_udp_recv_datagrams = dict()
00036 
00037 # Sent datagrams (with time)
00038 dict_udp_sent_datagrams = dict()
00039 
00040 
00041 class UDPEchoClient_Handler(BaseRequestHandler):
00042     def handle(self):
00043         """ One handle per connection
00044         """
00045         _data, _socket = self.request
00046         # Process received datagram
00047         data_str = repr(_data)[1:-1]
00048         dict_udp_recv_datagrams[data_str] = time()
00049 
00050 
00051 def udp_packet_recv (threadName, server_ip, server_port):
00052     """ This function will receive packet stream from mbed device
00053     """
00054     server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
00055     print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port)
00056     server.serve_forever()
00057 
00058 
00059 class UDPEchoServerTest(DefaultTest):
00060     ECHO_SERVER_ADDRESS = ""       # UDP IP of datagram bursts
00061     ECHO_PORT = 0                  # UDP port for datagram bursts
00062     CONTROL_PORT = 23              # TCP port used to get stats from mbed device, e.g. counters
00063     s = None                       # Socket
00064 
00065     TEST_PACKET_COUNT = 1000       # how many packets should be send
00066     TEST_STRESS_FACTOR = 0.001     # stress factor: 10 ms
00067     PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in %
00068 
00069     PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
00070     re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
00071 
00072     def get_control_data(self, command="stat\n"):
00073         BUFFER_SIZE = 256
00074         try:
00075             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00076             s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
00077         except Exception, e:
00078             data = None
00079         s.send(command)
00080         data = s.recv(BUFFER_SIZE)
00081         s.close()
00082         return data
00083 
00084     def test(self):
00085         serial_ip_msg = self.mbed.serial_readline()
00086         if serial_ip_msg is None:
00087             return self.RESULT_IO_SERIAL
00088         stdout.write(serial_ip_msg)
00089         stdout.flush()
00090         # Searching for IP address and port prompted by server
00091         m = self.re_detect_server_ip.search(serial_ip_msg)
00092         if m and len(m.groups()):
00093             self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
00094             self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
00095             self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
00096 
00097         # Open client socket to burst datagrams to UDP server in mbed
00098         try:
00099             self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00100         except Exception, e:
00101             self.s = None
00102             self.notify("HOST: Error: %s"% e)
00103             return self.RESULT_ERROR
00104 
00105         # UDP replied receiver works in background to get echoed datagrams
00106         SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
00107         SERVER_PORT = self.ECHO_PORT + 1
00108         thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT))
00109         sleep(0.5)
00110 
00111         # Burst part
00112         for no in range(self.TEST_PACKET_COUNT):
00113             TEST_STRING = str(uuid.uuid4())
00114             payload = str(no) + "__" + TEST_STRING
00115             self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
00116             dict_udp_sent_datagrams[payload] = time()
00117             sleep(self.TEST_STRESS_FACTOR)
00118 
00119         if self.s is not None:
00120             self.s.close()
00121 
00122         # Wait 5 seconds for packets to come
00123         result = True
00124         self.notify("HOST: Test Summary:")
00125         for d in range(5):
00126             sleep(1.0)
00127             summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
00128             self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d,
00129                                                                                                      summary_datagram_success,
00130                                                                                                      len(dict_udp_recv_datagrams),
00131                                                                                                      self.TEST_PACKET_COUNT,
00132                                                                                                      self.TEST_STRESS_FACTOR))
00133             result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
00134             stdout.flush()
00135 
00136         # Getting control data from test
00137         self.notify("...")
00138         self.notify("HOST: Mbed Summary:")
00139         mbed_stats = self.get_control_data()
00140         self.notify(mbed_stats)
00141         return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
00142 
00143 
00144 if __name__ == '__main__':
00145     UDPEchoServerTest().run()