Marco Zecchini
/
Example_RTOS
Rtos API example
Embed:
(wiki syntax)
Show/hide line numbers
udp_shotgun.py
00001 """ 00002 mbed SDK 00003 Copyright (c) 2011-2013 ARM Limited 00004 00005 Licensed under the Apache License, Version 2.0 (the "License"); 00006 you may not use this file except in compliance with the License. 00007 You may obtain a copy of the License at 00008 00009 http://www.apache.org/licenses/LICENSE-2.0 00010 00011 Unless required by applicable law or agreed to in writing, software 00012 distributed under the License is distributed on an "AS IS" BASIS, 00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 See the License for the specific language governing permissions and 00015 limitations under the License. 00016 """ 00017 00018 import sys 00019 import socket 00020 import json 00021 import random 00022 import itertools 00023 import time 00024 from sys import stdout 00025 from threading import Thread 00026 from SocketServer import BaseRequestHandler, UDPServer 00027 from mbed_host_tests import BaseHostTest, event_callback 00028 00029 00030 class UDPEchoClientHandler(BaseRequestHandler): 00031 def handle(self): 00032 """ UDP packet handler. Responds with multiple simultaneous packets 00033 """ 00034 data, sock = self.request 00035 pattern = [ord(d) << 4 for d in data] 00036 00037 # Each byte in request indicates size of packet to recieve 00038 # Each packet size is shifted over by 4 to fit in a byte, which 00039 # avoids any issues with endianess or decoding 00040 for packet in pattern: 00041 data = [random.randint(0, 255) for _ in range(packet-1)] 00042 data.append(reduce(lambda a,b: a^b, data)) 00043 data = ''.join(map(chr, data)) 00044 sock.sendto(data, self.client_address) 00045 00046 # Sleep a tiny bit to compensate for local network 00047 time.sleep(0.01) 00048 00049 00050 class UDPEchoClientTest(BaseHostTest): 00051 def __init__(self): 00052 """ 00053 Initialise test parameters. 00054 00055 :return: 00056 """ 00057 BaseHostTest.__init__(self) 00058 self.SERVER_IP = None # Will be determined after knowing the target IP 00059 self.SERVER_PORT = 0 # Let TCPServer choose an arbitrary port 00060 self.server = None 00061 self.server_thread = None 00062 self.target_ip = None 00063 00064 @staticmethod 00065 def find_interface_to_target_addr(target_ip): 00066 """ 00067 Finds IP address of the interface through which it is connected to the target. 00068 00069 :return: 00070 """ 00071 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 00072 try: 00073 s.connect((target_ip, 0)) # Target IP, any port 00074 except socket.error: 00075 s.connect((target_ip, 8000)) # Target IP, 'random' port 00076 ip = s.getsockname()[0] 00077 s.close() 00078 return ip 00079 00080 def setup_udp_server(self): 00081 """ 00082 sets up a UDP server for target to connect and send test data. 00083 00084 :return: 00085 """ 00086 # !NOTE: There should mechanism to assert in the host test 00087 if self.SERVER_IP is None: 00088 self.log("setup_udp_server() called before determining server IP!") 00089 self.notify_complete(False) 00090 00091 # Returning none will suppress host test from printing success code 00092 self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler) 00093 ip, port = self.server.server_address 00094 self.SERVER_PORT = port 00095 self.server.allow_reuse_address = True 00096 self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT)) 00097 self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,)) 00098 self.server_thread.start() 00099 00100 @staticmethod 00101 def server_thread_func(this): 00102 """ 00103 Thread function to run TCP server forever. 00104 00105 :param this: 00106 :return: 00107 """ 00108 this.server.serve_forever() 00109 00110 @event_callback("target_ip") 00111 def _callback_target_ip(self, key, value, timestamp): 00112 """ 00113 Callback to handle reception of target's IP address. 00114 00115 :param key: 00116 :param value: 00117 :param timestamp: 00118 :return: 00119 """ 00120 self.target_ip = value 00121 self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip) 00122 self.setup_udp_server() 00123 00124 @event_callback("host_ip") 00125 def _callback_host_ip(self, key, value, timestamp): 00126 """ 00127 Callback for request for host IP Addr 00128 00129 """ 00130 self.send_kv("host_ip", self.SERVER_IP) 00131 00132 @event_callback("host_port") 00133 def _callback_host_port(self, key, value, timestamp): 00134 """ 00135 Callback for request for host port 00136 """ 00137 self.send_kv("host_port", self.SERVER_PORT) 00138 00139 def teardown(self): 00140 if self.server: 00141 self.server.shutdown() 00142 self.server_thread.join()
Generated on Sun Jul 17 2022 08:25:32 by 1.7.2