Version of easy-connect with the u-blox cellular platforms C027 and C030 added.

Dependents:   HelloMQTT

Revision:
0:19aa55d66228
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/esp8266-driver/TESTS/net/host_tests/udp_shotgun.py	Thu Aug 10 14:33:05 2017 +0000
@@ -0,0 +1,142 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import sys
+import socket
+import json
+import random
+import itertools
+import time
+from sys import stdout
+from threading import Thread
+from SocketServer import BaseRequestHandler, UDPServer
+from mbed_host_tests import BaseHostTest, event_callback
+
+
+class UDPEchoClientHandler(BaseRequestHandler):
+    def handle(self):
+        """ UDP packet handler. Responds with multiple simultaneous packets
+        """
+        data, sock = self.request
+        pattern = [ord(d) << 4 for d in data]
+
+        # Each byte in request indicates size of packet to recieve
+        # Each packet size is shifted over by 4 to fit in a byte, which
+        # avoids any issues with endianess or decoding
+        for packet in pattern:
+            data = [random.randint(0, 255) for _ in range(packet-1)]
+            data.append(reduce(lambda a,b: a^b, data))
+            data = ''.join(map(chr, data))
+            sock.sendto(data, self.client_address)
+
+            # Sleep a tiny bit to compensate for local network
+            time.sleep(0.01)
+
+
+class UDPEchoClientTest(BaseHostTest):
+    def __init__(self):
+        """
+        Initialise test parameters.
+
+        :return:
+        """
+        BaseHostTest.__init__(self)
+        self.SERVER_IP = None # Will be determined after knowing the target IP
+        self.SERVER_PORT = 0  # Let TCPServer choose an arbitrary port
+        self.server = None
+        self.server_thread = None
+        self.target_ip = None
+
+    @staticmethod
+    def find_interface_to_target_addr(target_ip):
+        """
+        Finds IP address of the interface through which it is connected to the target.
+
+        :return:
+        """
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        try:
+            s.connect((target_ip, 0)) # Target IP, any port
+        except socket.error:
+            s.connect((target_ip, 8000)) # Target IP, 'random' port
+        ip = s.getsockname()[0]
+        s.close()
+        return ip
+
+    def setup_udp_server(self):
+        """
+        sets up a UDP server for target to connect and send test data.
+
+        :return:
+        """
+        # !NOTE: There should mechanism to assert in the host test
+        if self.SERVER_IP is None:
+            self.log("setup_udp_server() called before determining server IP!")
+            self.notify_complete(False)
+
+        # Returning none will suppress host test from printing success code
+        self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler)
+        ip, port = self.server.server_address
+        self.SERVER_PORT = port
+        self.server.allow_reuse_address = True
+        self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT))
+        self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,))
+        self.server_thread.start()
+
+    @staticmethod
+    def server_thread_func(this):
+        """
+        Thread function to run TCP server forever.
+
+        :param this:
+        :return:
+        """
+        this.server.serve_forever()
+
+    @event_callback("target_ip")
+    def _callback_target_ip(self, key, value, timestamp):
+        """
+        Callback to handle reception of target's IP address.
+
+        :param key:
+        :param value:
+        :param timestamp:
+        :return:
+        """
+        self.target_ip = value
+        self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip)
+        self.setup_udp_server()
+
+    @event_callback("host_ip")
+    def _callback_host_ip(self, key, value, timestamp):
+        """
+        Callback for request for host IP Addr
+
+        """
+        self.send_kv("host_ip", self.SERVER_IP)
+
+    @event_callback("host_port")
+    def _callback_host_port(self, key, value, timestamp):
+        """
+        Callback for request for host port
+        """
+        self.send_kv("host_port", self.SERVER_PORT)
+
+    def teardown(self):
+        if self.server:
+            self.server.shutdown()
+            self.server_thread.join()