joey shelton / LED_Demo

Dependencies:   MAX44000 PWM_Tone_Library nexpaq_mdk

Fork of LED_Demo by Maxim nexpaq

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers udp_echo_client.py Source File

udp_echo_client.py

00001 """
00002 mbed SDK
00003 Copyright (c) 2011-2013 ARM Limited
00004 
00005 Licensed under the Apache License, Version 2.0 (the "License");
00006 you may not use this file except in compliance with the License.
00007 You may obtain a copy of the License at
00008 
00009     http://www.apache.org/licenses/LICENSE-2.0
00010 
00011 Unless required by applicable law or agreed to in writing, software
00012 distributed under the License is distributed on an "AS IS" BASIS,
00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 See the License for the specific language governing permissions and
00015 limitations under the License.
00016 """
00017 
00018 import sys
00019 import socket
00020 from sys import stdout
00021 from threading import Thread
00022 from SocketServer import BaseRequestHandler, UDPServer
00023 from mbed_host_tests import BaseHostTest, event_callback
00024 
00025 
00026 class UDPEchoClientHandler(BaseRequestHandler):
00027     def handle(self):
00028         """ UDP packet handler. Echoes data back to sender's address.
00029         """
00030         data, sock = self.request
00031         print ('HOST: UDPEchoClientHandler: Rx: \n%s\n' % data)
00032         sock.sendto(data, self.client_address)
00033 
00034 
00035 class UDPEchoClientTest(BaseHostTest):
00036 
00037     def __init__(self):
00038         """
00039         Initialise test parameters.
00040 
00041         :return:
00042         """
00043         BaseHostTest.__init__(self)
00044         self.SERVER_IP = None # Will be determined after knowing the target IP
00045         self.SERVER_PORT = 0  # Let TCPServer choose an arbitrary port
00046         self.server = None
00047         self.server_thread = None
00048         self.target_ip = None
00049 
00050     @staticmethod
00051     def find_interface_to_target_addr(target_ip):
00052         """
00053         Finds IP address of the interface through which it is connected to the target.
00054 
00055         :return:
00056         """
00057         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00058         s.connect((target_ip, 0)) # Target IP, Any port
00059         ip = s.getsockname()[0]
00060         s.close()
00061         return ip
00062 
00063     def setup_udp_server(self):
00064         """
00065         sets up a UDP server for target to connect and send test data.
00066 
00067         :return:
00068         """
00069         # !NOTE: There should mechanism to assert in the host test
00070         if self.SERVER_IP is None:
00071             self.log("setup_udp_server() called before determining server IP!")
00072             self.notify_complete(False)
00073 
00074         # Returning none will suppress host test from printing success code
00075         self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler)
00076         ip, port = self.server.server_address
00077         self.SERVER_PORT = port
00078         self.server.allow_reuse_address = True
00079         self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT))
00080         self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,))
00081         self.server_thread.start()
00082 
00083     @staticmethod
00084     def server_thread_func(this):
00085         """
00086         Thread function to run TCP server forever.
00087 
00088         :param this:
00089         :return:
00090         """
00091         this.server.serve_forever()
00092 
00093     @event_callback("target_ip")
00094     def _callback_target_ip(self, key, value, timestamp):
00095         """
00096         Callback to handle reception of target's IP address.
00097 
00098         :param key:
00099         :param value:
00100         :param timestamp:
00101         :return:
00102         """
00103         self.target_ip = value
00104         self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip)
00105         self.setup_udp_server()
00106 
00107     @event_callback("host_ip")
00108     def _callback_host_ip(self, key, value, timestamp):
00109         """
00110         Callback for request for host IP Addr
00111 
00112         """
00113         self.send_kv("host_ip", self.SERVER_IP)
00114 
00115     @event_callback("host_port")
00116     def _callback_host_port(self, key, value, timestamp):
00117         """
00118         Callback for request for host port
00119         """
00120         self.send_kv("host_port", self.SERVER_PORT)
00121 
00122     def teardown(self):
00123         if self.server:
00124             self.server.shutdown()
00125             self.server_thread.join()