Marco Zecchini
/
Example_RTOS
Rtos API example
Embed:
(wiki syntax)
Show/hide line numbers
udp_echo.py
00001 """ 00002 mbed SDK 00003 Copyright (c) 2011-2013 ARM Limited 00004 00005 Licensed under the Apache License, Version 2.0 (the "License"); 00006 you may not use this file except in compliance with the License. 00007 You may obtain a copy of the License at 00008 00009 http://www.apache.org/licenses/LICENSE-2.0 00010 00011 Unless required by applicable law or agreed to in writing, software 00012 distributed under the License is distributed on an "AS IS" BASIS, 00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 See the License for the specific language governing permissions and 00015 limitations under the License. 00016 """ 00017 00018 import sys 00019 import socket 00020 from sys import stdout 00021 from threading import Thread 00022 from SocketServer import BaseRequestHandler, UDPServer 00023 from mbed_host_tests import BaseHostTest, event_callback 00024 00025 00026 class UDPEchoClientHandler(BaseRequestHandler): 00027 def handle(self): 00028 """ UDP packet handler. Echoes data back to sender's address. 00029 """ 00030 data, sock = self.request 00031 sock.sendto(data, self.client_address) 00032 00033 00034 class UDPEchoClientTest(BaseHostTest): 00035 00036 def __init__(self): 00037 """ 00038 Initialise test parameters. 00039 00040 :return: 00041 """ 00042 BaseHostTest.__init__(self) 00043 self.SERVER_IP = None # Will be determined after knowing the target IP 00044 self.SERVER_PORT = 0 # Let TCPServer choose an arbitrary port 00045 self.server = None 00046 self.server_thread = None 00047 self.target_ip = None 00048 00049 @staticmethod 00050 def find_interface_to_target_addr(target_ip): 00051 """ 00052 Finds IP address of the interface through which it is connected to the target. 00053 00054 :return: 00055 """ 00056 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 00057 try: 00058 s.connect((target_ip, 0)) # Target IP, any port 00059 except socket.error: 00060 s.connect((target_ip, 8000)) # Target IP, 'random' port 00061 ip = s.getsockname()[0] 00062 s.close() 00063 return ip 00064 00065 def setup_udp_server(self): 00066 """ 00067 sets up a UDP server for target to connect and send test data. 00068 00069 :return: 00070 """ 00071 # !NOTE: There should mechanism to assert in the host test 00072 if self.SERVER_IP is None: 00073 self.log("setup_udp_server() called before determining server IP!") 00074 self.notify_complete(False) 00075 00076 # Returning none will suppress host test from printing success code 00077 self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler) 00078 ip, port = self.server.server_address 00079 self.SERVER_PORT = port 00080 self.server.allow_reuse_address = True 00081 self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT)) 00082 self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,)) 00083 self.server_thread.start() 00084 00085 @staticmethod 00086 def server_thread_func(this): 00087 """ 00088 Thread function to run TCP server forever. 00089 00090 :param this: 00091 :return: 00092 """ 00093 this.server.serve_forever() 00094 00095 @event_callback("target_ip") 00096 def _callback_target_ip(self, key, value, timestamp): 00097 """ 00098 Callback to handle reception of target's IP address. 00099 00100 :param key: 00101 :param value: 00102 :param timestamp: 00103 :return: 00104 """ 00105 self.target_ip = value 00106 self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip) 00107 self.setup_udp_server() 00108 00109 @event_callback("host_ip") 00110 def _callback_host_ip(self, key, value, timestamp): 00111 """ 00112 Callback for request for host IP Addr 00113 00114 """ 00115 self.send_kv("host_ip", self.SERVER_IP) 00116 00117 @event_callback("host_port") 00118 def _callback_host_port(self, key, value, timestamp): 00119 """ 00120 Callback for request for host port 00121 """ 00122 self.send_kv("host_port", self.SERVER_PORT) 00123 00124 def teardown(self): 00125 if self.server: 00126 self.server.shutdown() 00127 self.server_thread.join()
Generated on Sun Jul 17 2022 08:25:32 by 1.7.2