ON Semiconductor / mbed-os

Dependents:   mbed-TFT-example-NCS36510 mbed-Accelerometer-example-NCS36510 mbed-Accelerometer-example-NCS36510

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers udp_shotgun.py Source File

udp_shotgun.py

00001 """
00002 mbed SDK
00003 Copyright (c) 2011-2013 ARM Limited
00004 
00005 Licensed under the Apache License, Version 2.0 (the "License");
00006 you may not use this file except in compliance with the License.
00007 You may obtain a copy of the License at
00008 
00009     http://www.apache.org/licenses/LICENSE-2.0
00010 
00011 Unless required by applicable law or agreed to in writing, software
00012 distributed under the License is distributed on an "AS IS" BASIS,
00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 See the License for the specific language governing permissions and
00015 limitations under the License.
00016 """
00017 
00018 import sys
00019 import socket
00020 import json
00021 import random
00022 import itertools
00023 from sys import stdout
00024 from threading import Thread
00025 from SocketServer import BaseRequestHandler, UDPServer
00026 from mbed_host_tests import BaseHostTest, event_callback
00027 
00028 
00029 class UDPEchoClientHandler(BaseRequestHandler):
00030     def handle(self):
00031         """ UDP packet handler. Responds with multiple simultaneous packets
00032         """
00033         data, sock = self.request
00034         pattern = [ord(d) << 4 for d in data]
00035 
00036         # Each byte in request indicates size of packet to recieve
00037         # Each packet size is shifted over by 4 to fit in a byte, which
00038         # avoids any issues with endianess or decoding
00039         for packet in pattern:
00040             data = [random.randint(0, 255) for _ in range(packet-1)]
00041             data.append(reduce(lambda a,b: a^b, data))
00042             data = ''.join(map(chr, data))
00043             sock.sendto(data, self.client_address)
00044 
00045 
00046 class UDPEchoClientTest(BaseHostTest):
00047     def __init__(self):
00048         """
00049         Initialise test parameters.
00050 
00051         :return:
00052         """
00053         BaseHostTest.__init__(self)
00054         self.SERVER_IP = None # Will be determined after knowing the target IP
00055         self.SERVER_PORT = 0  # Let TCPServer choose an arbitrary port
00056         self.server = None
00057         self.server_thread = None
00058         self.target_ip = None
00059 
00060     @staticmethod
00061     def find_interface_to_target_addr(target_ip):
00062         """
00063         Finds IP address of the interface through which it is connected to the target.
00064 
00065         :return:
00066         """
00067         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00068         s.connect((target_ip, 0)) # Target IP, Any port
00069         ip = s.getsockname()[0]
00070         s.close()
00071         return ip
00072 
00073     def setup_udp_server(self):
00074         """
00075         sets up a UDP server for target to connect and send test data.
00076 
00077         :return:
00078         """
00079         # !NOTE: There should mechanism to assert in the host test
00080         if self.SERVER_IP is None:
00081             self.log("setup_udp_server() called before determining server IP!")
00082             self.notify_complete(False)
00083 
00084         # Returning none will suppress host test from printing success code
00085         self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler)
00086         ip, port = self.server.server_address
00087         self.SERVER_PORT = port
00088         self.server.allow_reuse_address = True
00089         self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT))
00090         self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,))
00091         self.server_thread.start()
00092 
00093     @staticmethod
00094     def server_thread_func(this):
00095         """
00096         Thread function to run TCP server forever.
00097 
00098         :param this:
00099         :return:
00100         """
00101         this.server.serve_forever()
00102 
00103     @event_callback("target_ip")
00104     def _callback_target_ip(self, key, value, timestamp):
00105         """
00106         Callback to handle reception of target's IP address.
00107 
00108         :param key:
00109         :param value:
00110         :param timestamp:
00111         :return:
00112         """
00113         self.target_ip = value
00114         self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip)
00115         self.setup_udp_server()
00116 
00117     @event_callback("host_ip")
00118     def _callback_host_ip(self, key, value, timestamp):
00119         """
00120         Callback for request for host IP Addr
00121 
00122         """
00123         self.send_kv("host_ip", self.SERVER_IP)
00124 
00125     @event_callback("host_port")
00126     def _callback_host_port(self, key, value, timestamp):
00127         """
00128         Callback for request for host port
00129         """
00130         self.send_kv("host_port", self.SERVER_PORT)
00131 
00132     def teardown(self):
00133         if self.server:
00134             self.server.shutdown()
00135             self.server_thread.join()