Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mbed-TFT-example-NCS36510 mbed-Accelerometer-example-NCS36510 mbed-Accelerometer-example-NCS36510
udp_link_layer_auto.py
00001 """ 00002 mbed SDK 00003 Copyright (c) 2011-2013 ARM Limited 00004 00005 Licensed under the Apache License, Version 2.0 (the "License"); 00006 you may not use this file except in compliance with the License. 00007 You may obtain a copy of the License at 00008 00009 http://www.apache.org/licenses/LICENSE-2.0 00010 00011 Unless required by applicable law or agreed to in writing, software 00012 distributed under the License is distributed on an "AS IS" BASIS, 00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 See the License for the specific language governing permissions and 00015 limitations under the License. 00016 """ 00017 00018 """ 00019 How to use: 00020 make.py -m LPC1768 -t ARM -d E:\ -n NET_14 00021 udp_link_layer_auto.py -p COM20 -d E:\ -t 10 00022 """ 00023 00024 import re 00025 import uuid 00026 import socket 00027 import thread 00028 from sys import stdout 00029 from time import time, sleep 00030 from host_test import DefaultTest 00031 from SocketServer import BaseRequestHandler, UDPServer 00032 00033 00034 # Received datagrams (with time) 00035 dict_udp_recv_datagrams = dict() 00036 00037 # Sent datagrams (with time) 00038 dict_udp_sent_datagrams = dict() 00039 00040 00041 class UDPEchoClient_Handler(BaseRequestHandler): 00042 def handle(self): 00043 """ One handle per connection 00044 """ 00045 _data, _socket = self.request 00046 # Process received datagram 00047 data_str = repr(_data)[1:-1] 00048 dict_udp_recv_datagrams[data_str] = time() 00049 00050 00051 def udp_packet_recv (threadName, server_ip, server_port): 00052 """ This function will receive packet stream from mbed device 00053 """ 00054 server = UDPServer((server_ip, server_port), UDPEchoClient_Handler) 00055 print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port) 00056 server.serve_forever() 00057 00058 00059 class UDPEchoServerTest(DefaultTest): 00060 ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts 00061 ECHO_PORT = 0 # UDP port for datagram bursts 00062 CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters 00063 s = None # Socket 00064 00065 TEST_PACKET_COUNT = 1000 # how many packets should be send 00066 TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms 00067 PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in % 00068 00069 PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" 00070 re_detect_server_ip = re.compile(PATTERN_SERVER_IP) 00071 00072 def get_control_data(self, command="stat\n"): 00073 BUFFER_SIZE = 256 00074 try: 00075 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 00076 s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT)) 00077 except Exception, e: 00078 data = None 00079 s.send(command) 00080 data = s.recv(BUFFER_SIZE) 00081 s.close() 00082 return data 00083 00084 def test(self): 00085 serial_ip_msg = self.mbed.serial_readline() 00086 if serial_ip_msg is None: 00087 return self.RESULT_IO_SERIAL 00088 stdout.write(serial_ip_msg) 00089 stdout.flush() 00090 # Searching for IP address and port prompted by server 00091 m = self.re_detect_server_ip.search(serial_ip_msg) 00092 if m and len(m.groups()): 00093 self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) 00094 self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method 00095 self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) 00096 00097 # Open client socket to burst datagrams to UDP server in mbed 00098 try: 00099 self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 00100 except Exception, e: 00101 self.s = None 00102 self.notify("HOST: Error: %s"% e) 00103 return self.RESULT_ERROR 00104 00105 # UDP replied receiver works in background to get echoed datagrams 00106 SERVER_IP = str(socket.gethostbyname(socket.getfqdn())) 00107 SERVER_PORT = self.ECHO_PORT + 1 00108 thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT)) 00109 sleep(0.5) 00110 00111 # Burst part 00112 for no in range(self.TEST_PACKET_COUNT): 00113 TEST_STRING = str(uuid.uuid4()) 00114 payload = str(no) + "__" + TEST_STRING 00115 self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) 00116 dict_udp_sent_datagrams[payload] = time() 00117 sleep(self.TEST_STRESS_FACTOR) 00118 00119 if self.s is not None: 00120 self.s.close() 00121 00122 # Wait 5 seconds for packets to come 00123 result = True 00124 self.notify("HOST: Test Summary:") 00125 for d in range(5): 00126 sleep(1.0) 00127 summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0 00128 self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d, 00129 summary_datagram_success, 00130 len(dict_udp_recv_datagrams), 00131 self.TEST_PACKET_COUNT, 00132 self.TEST_STRESS_FACTOR)) 00133 result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO) 00134 stdout.flush() 00135 00136 # Getting control data from test 00137 self.notify("...") 00138 self.notify("HOST: Mbed Summary:") 00139 mbed_stats = self.get_control_data() 00140 self.notify(mbed_stats) 00141 return self.RESULT_SUCCESS if result else self.RESULT_FAILURE 00142 00143 00144 if __name__ == '__main__': 00145 UDPEchoServerTest().run()
Generated on Tue Jul 12 2022 11:02:57 by
