Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mbed-TFT-example-NCS36510 mbed-Accelerometer-example-NCS36510 mbed-Accelerometer-example-NCS36510
udp_shotgun.py
00001 """ 00002 mbed SDK 00003 Copyright (c) 2011-2013 ARM Limited 00004 00005 Licensed under the Apache License, Version 2.0 (the "License"); 00006 you may not use this file except in compliance with the License. 00007 You may obtain a copy of the License at 00008 00009 http://www.apache.org/licenses/LICENSE-2.0 00010 00011 Unless required by applicable law or agreed to in writing, software 00012 distributed under the License is distributed on an "AS IS" BASIS, 00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 See the License for the specific language governing permissions and 00015 limitations under the License. 00016 """ 00017 00018 import sys 00019 import socket 00020 import json 00021 import random 00022 import itertools 00023 from sys import stdout 00024 from threading import Thread 00025 from SocketServer import BaseRequestHandler, UDPServer 00026 from mbed_host_tests import BaseHostTest, event_callback 00027 00028 00029 class UDPEchoClientHandler(BaseRequestHandler): 00030 def handle(self): 00031 """ UDP packet handler. Responds with multiple simultaneous packets 00032 """ 00033 data, sock = self.request 00034 pattern = [ord(d) << 4 for d in data] 00035 00036 # Each byte in request indicates size of packet to recieve 00037 # Each packet size is shifted over by 4 to fit in a byte, which 00038 # avoids any issues with endianess or decoding 00039 for packet in pattern: 00040 data = [random.randint(0, 255) for _ in range(packet-1)] 00041 data.append(reduce(lambda a,b: a^b, data)) 00042 data = ''.join(map(chr, data)) 00043 sock.sendto(data, self.client_address) 00044 00045 00046 class UDPEchoClientTest(BaseHostTest): 00047 def __init__(self): 00048 """ 00049 Initialise test parameters. 00050 00051 :return: 00052 """ 00053 BaseHostTest.__init__(self) 00054 self.SERVER_IP = None # Will be determined after knowing the target IP 00055 self.SERVER_PORT = 0 # Let TCPServer choose an arbitrary port 00056 self.server = None 00057 self.server_thread = None 00058 self.target_ip = None 00059 00060 @staticmethod 00061 def find_interface_to_target_addr(target_ip): 00062 """ 00063 Finds IP address of the interface through which it is connected to the target. 00064 00065 :return: 00066 """ 00067 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 00068 s.connect((target_ip, 0)) # Target IP, Any port 00069 ip = s.getsockname()[0] 00070 s.close() 00071 return ip 00072 00073 def setup_udp_server(self): 00074 """ 00075 sets up a UDP server for target to connect and send test data. 00076 00077 :return: 00078 """ 00079 # !NOTE: There should mechanism to assert in the host test 00080 if self.SERVER_IP is None: 00081 self.log("setup_udp_server() called before determining server IP!") 00082 self.notify_complete(False) 00083 00084 # Returning none will suppress host test from printing success code 00085 self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler) 00086 ip, port = self.server.server_address 00087 self.SERVER_PORT = port 00088 self.server.allow_reuse_address = True 00089 self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT)) 00090 self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,)) 00091 self.server_thread.start() 00092 00093 @staticmethod 00094 def server_thread_func(this): 00095 """ 00096 Thread function to run TCP server forever. 00097 00098 :param this: 00099 :return: 00100 """ 00101 this.server.serve_forever() 00102 00103 @event_callback("target_ip") 00104 def _callback_target_ip(self, key, value, timestamp): 00105 """ 00106 Callback to handle reception of target's IP address. 00107 00108 :param key: 00109 :param value: 00110 :param timestamp: 00111 :return: 00112 """ 00113 self.target_ip = value 00114 self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip) 00115 self.setup_udp_server() 00116 00117 @event_callback("host_ip") 00118 def _callback_host_ip(self, key, value, timestamp): 00119 """ 00120 Callback for request for host IP Addr 00121 00122 """ 00123 self.send_kv("host_ip", self.SERVER_IP) 00124 00125 @event_callback("host_port") 00126 def _callback_host_port(self, key, value, timestamp): 00127 """ 00128 Callback for request for host port 00129 """ 00130 self.send_kv("host_port", self.SERVER_PORT) 00131 00132 def teardown(self): 00133 if self.server: 00134 self.server.shutdown() 00135 self.server_thread.join()
Generated on Tue Jul 12 2022 11:02:57 by
