Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: MAX44000 PWM_Tone_Library nexpaq_mdk
Fork of LED_Demo by
udp_echo_client.py
00001 """ 00002 mbed SDK 00003 Copyright (c) 2011-2013 ARM Limited 00004 00005 Licensed under the Apache License, Version 2.0 (the "License"); 00006 you may not use this file except in compliance with the License. 00007 You may obtain a copy of the License at 00008 00009 http://www.apache.org/licenses/LICENSE-2.0 00010 00011 Unless required by applicable law or agreed to in writing, software 00012 distributed under the License is distributed on an "AS IS" BASIS, 00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 See the License for the specific language governing permissions and 00015 limitations under the License. 00016 """ 00017 00018 import sys 00019 import socket 00020 from sys import stdout 00021 from threading import Thread 00022 from SocketServer import BaseRequestHandler, UDPServer 00023 from mbed_host_tests import BaseHostTest, event_callback 00024 00025 00026 class UDPEchoClientHandler(BaseRequestHandler): 00027 def handle(self): 00028 """ UDP packet handler. Echoes data back to sender's address. 00029 """ 00030 data, sock = self.request 00031 print ('HOST: UDPEchoClientHandler: Rx: \n%s\n' % data) 00032 sock.sendto(data, self.client_address) 00033 00034 00035 class UDPEchoClientTest(BaseHostTest): 00036 00037 def __init__(self): 00038 """ 00039 Initialise test parameters. 00040 00041 :return: 00042 """ 00043 BaseHostTest.__init__(self) 00044 self.SERVER_IP = None # Will be determined after knowing the target IP 00045 self.SERVER_PORT = 0 # Let TCPServer choose an arbitrary port 00046 self.server = None 00047 self.server_thread = None 00048 self.target_ip = None 00049 00050 @staticmethod 00051 def find_interface_to_target_addr(target_ip): 00052 """ 00053 Finds IP address of the interface through which it is connected to the target. 00054 00055 :return: 00056 """ 00057 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 00058 s.connect((target_ip, 0)) # Target IP, Any port 00059 ip = s.getsockname()[0] 00060 s.close() 00061 return ip 00062 00063 def setup_udp_server(self): 00064 """ 00065 sets up a UDP server for target to connect and send test data. 00066 00067 :return: 00068 """ 00069 # !NOTE: There should mechanism to assert in the host test 00070 if self.SERVER_IP is None: 00071 self.log("setup_udp_server() called before determining server IP!") 00072 self.notify_complete(False) 00073 00074 # Returning none will suppress host test from printing success code 00075 self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler) 00076 ip, port = self.server.server_address 00077 self.SERVER_PORT = port 00078 self.server.allow_reuse_address = True 00079 self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT)) 00080 self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,)) 00081 self.server_thread.start() 00082 00083 @staticmethod 00084 def server_thread_func(this): 00085 """ 00086 Thread function to run TCP server forever. 00087 00088 :param this: 00089 :return: 00090 """ 00091 this.server.serve_forever() 00092 00093 @event_callback("target_ip") 00094 def _callback_target_ip(self, key, value, timestamp): 00095 """ 00096 Callback to handle reception of target's IP address. 00097 00098 :param key: 00099 :param value: 00100 :param timestamp: 00101 :return: 00102 """ 00103 self.target_ip = value 00104 self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip) 00105 self.setup_udp_server() 00106 00107 @event_callback("host_ip") 00108 def _callback_host_ip(self, key, value, timestamp): 00109 """ 00110 Callback for request for host IP Addr 00111 00112 """ 00113 self.send_kv("host_ip", self.SERVER_IP) 00114 00115 @event_callback("host_port") 00116 def _callback_host_port(self, key, value, timestamp): 00117 """ 00118 Callback for request for host port 00119 """ 00120 self.send_kv("host_port", self.SERVER_PORT) 00121 00122 def teardown(self): 00123 if self.server: 00124 self.server.shutdown() 00125 self.server_thread.join()
Generated on Tue Jul 12 2022 12:28:56 by
