Version of easy-connect with the u-blox cellular platforms C027 and C030 added.

Dependents:   HelloMQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers udp_shotgun.py Source File

udp_shotgun.py

00001 """
00002 mbed SDK
00003 Copyright (c) 2011-2013 ARM Limited
00004 
00005 Licensed under the Apache License, Version 2.0 (the "License");
00006 you may not use this file except in compliance with the License.
00007 You may obtain a copy of the License at
00008 
00009     http://www.apache.org/licenses/LICENSE-2.0
00010 
00011 Unless required by applicable law or agreed to in writing, software
00012 distributed under the License is distributed on an "AS IS" BASIS,
00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 See the License for the specific language governing permissions and
00015 limitations under the License.
00016 """
00017 
00018 import sys
00019 import socket
00020 import json
00021 import random
00022 import itertools
00023 import time
00024 from sys import stdout
00025 from threading import Thread
00026 from SocketServer import BaseRequestHandler, UDPServer
00027 from mbed_host_tests import BaseHostTest, event_callback
00028 
00029 
00030 class UDPEchoClientHandler(BaseRequestHandler):
00031     def handle(self):
00032         """ UDP packet handler. Responds with multiple simultaneous packets
00033         """
00034         data, sock = self.request
00035         pattern = [ord(d) << 4 for d in data]
00036 
00037         # Each byte in request indicates size of packet to recieve
00038         # Each packet size is shifted over by 4 to fit in a byte, which
00039         # avoids any issues with endianess or decoding
00040         for packet in pattern:
00041             data = [random.randint(0, 255) for _ in range(packet-1)]
00042             data.append(reduce(lambda a,b: a^b, data))
00043             data = ''.join(map(chr, data))
00044             sock.sendto(data, self.client_address)
00045 
00046             # Sleep a tiny bit to compensate for local network
00047             time.sleep(0.01)
00048 
00049 
00050 class UDPEchoClientTest(BaseHostTest):
00051     def __init__(self):
00052         """
00053         Initialise test parameters.
00054 
00055         :return:
00056         """
00057         BaseHostTest.__init__(self)
00058         self.SERVER_IP = None # Will be determined after knowing the target IP
00059         self.SERVER_PORT = 0  # Let TCPServer choose an arbitrary port
00060         self.server = None
00061         self.server_thread = None
00062         self.target_ip = None
00063 
00064     @staticmethod
00065     def find_interface_to_target_addr(target_ip):
00066         """
00067         Finds IP address of the interface through which it is connected to the target.
00068 
00069         :return:
00070         """
00071         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00072         try:
00073             s.connect((target_ip, 0)) # Target IP, any port
00074         except socket.error:
00075             s.connect((target_ip, 8000)) # Target IP, 'random' port
00076         ip = s.getsockname()[0]
00077         s.close()
00078         return ip
00079 
00080     def setup_udp_server(self):
00081         """
00082         sets up a UDP server for target to connect and send test data.
00083 
00084         :return:
00085         """
00086         # !NOTE: There should mechanism to assert in the host test
00087         if self.SERVER_IP is None:
00088             self.log("setup_udp_server() called before determining server IP!")
00089             self.notify_complete(False)
00090 
00091         # Returning none will suppress host test from printing success code
00092         self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler)
00093         ip, port = self.server.server_address
00094         self.SERVER_PORT = port
00095         self.server.allow_reuse_address = True
00096         self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT))
00097         self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,))
00098         self.server_thread.start()
00099 
00100     @staticmethod
00101     def server_thread_func(this):
00102         """
00103         Thread function to run TCP server forever.
00104 
00105         :param this:
00106         :return:
00107         """
00108         this.server.serve_forever()
00109 
00110     @event_callback("target_ip")
00111     def _callback_target_ip(self, key, value, timestamp):
00112         """
00113         Callback to handle reception of target's IP address.
00114 
00115         :param key:
00116         :param value:
00117         :param timestamp:
00118         :return:
00119         """
00120         self.target_ip = value
00121         self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip)
00122         self.setup_udp_server()
00123 
00124     @event_callback("host_ip")
00125     def _callback_host_ip(self, key, value, timestamp):
00126         """
00127         Callback for request for host IP Addr
00128 
00129         """
00130         self.send_kv("host_ip", self.SERVER_IP)
00131 
00132     @event_callback("host_port")
00133     def _callback_host_port(self, key, value, timestamp):
00134         """
00135         Callback for request for host port
00136         """
00137         self.send_kv("host_port", self.SERVER_PORT)
00138 
00139     def teardown(self):
00140         if self.server:
00141             self.server.shutdown()
00142             self.server_thread.join()