wifi test

Dependencies:   X_NUCLEO_IKS01A2 mbed-http

Committer:
JMF
Date:
Wed Sep 05 14:28:24 2018 +0000
Revision:
0:24d3eb812fd4
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
JMF 0:24d3eb812fd4 1 """
JMF 0:24d3eb812fd4 2 mbed SDK
JMF 0:24d3eb812fd4 3 Copyright (c) 2011-2013 ARM Limited
JMF 0:24d3eb812fd4 4
JMF 0:24d3eb812fd4 5 Licensed under the Apache License, Version 2.0 (the "License");
JMF 0:24d3eb812fd4 6 you may not use this file except in compliance with the License.
JMF 0:24d3eb812fd4 7 You may obtain a copy of the License at
JMF 0:24d3eb812fd4 8
JMF 0:24d3eb812fd4 9 http://www.apache.org/licenses/LICENSE-2.0
JMF 0:24d3eb812fd4 10
JMF 0:24d3eb812fd4 11 Unless required by applicable law or agreed to in writing, software
JMF 0:24d3eb812fd4 12 distributed under the License is distributed on an "AS IS" BASIS,
JMF 0:24d3eb812fd4 13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
JMF 0:24d3eb812fd4 14 See the License for the specific language governing permissions and
JMF 0:24d3eb812fd4 15 limitations under the License.
JMF 0:24d3eb812fd4 16 """
JMF 0:24d3eb812fd4 17
JMF 0:24d3eb812fd4 18 import sys
JMF 0:24d3eb812fd4 19 import socket
JMF 0:24d3eb812fd4 20 import json
JMF 0:24d3eb812fd4 21 import random
JMF 0:24d3eb812fd4 22 import itertools
JMF 0:24d3eb812fd4 23 import time
JMF 0:24d3eb812fd4 24 from sys import stdout
JMF 0:24d3eb812fd4 25 from threading import Thread
JMF 0:24d3eb812fd4 26 from SocketServer import BaseRequestHandler, UDPServer
JMF 0:24d3eb812fd4 27 from mbed_host_tests import BaseHostTest, event_callback
JMF 0:24d3eb812fd4 28
JMF 0:24d3eb812fd4 29
JMF 0:24d3eb812fd4 30 class UDPEchoClientHandler(BaseRequestHandler):
JMF 0:24d3eb812fd4 31 def handle(self):
JMF 0:24d3eb812fd4 32 """ UDP packet handler. Responds with multiple simultaneous packets
JMF 0:24d3eb812fd4 33 """
JMF 0:24d3eb812fd4 34 data, sock = self.request
JMF 0:24d3eb812fd4 35 pattern = [ord(d) << 4 for d in data]
JMF 0:24d3eb812fd4 36
JMF 0:24d3eb812fd4 37 # Each byte in request indicates size of packet to recieve
JMF 0:24d3eb812fd4 38 # Each packet size is shifted over by 4 to fit in a byte, which
JMF 0:24d3eb812fd4 39 # avoids any issues with endianess or decoding
JMF 0:24d3eb812fd4 40 for packet in pattern:
JMF 0:24d3eb812fd4 41 data = [random.randint(0, 255) for _ in range(packet-1)]
JMF 0:24d3eb812fd4 42 data.append(reduce(lambda a,b: a^b, data))
JMF 0:24d3eb812fd4 43 data = ''.join(map(chr, data))
JMF 0:24d3eb812fd4 44 sock.sendto(data, self.client_address)
JMF 0:24d3eb812fd4 45
JMF 0:24d3eb812fd4 46 # Sleep a tiny bit to compensate for local network
JMF 0:24d3eb812fd4 47 time.sleep(0.01)
JMF 0:24d3eb812fd4 48
JMF 0:24d3eb812fd4 49
JMF 0:24d3eb812fd4 50 class UDPEchoClientTest(BaseHostTest):
JMF 0:24d3eb812fd4 51 def __init__(self):
JMF 0:24d3eb812fd4 52 """
JMF 0:24d3eb812fd4 53 Initialise test parameters.
JMF 0:24d3eb812fd4 54
JMF 0:24d3eb812fd4 55 :return:
JMF 0:24d3eb812fd4 56 """
JMF 0:24d3eb812fd4 57 BaseHostTest.__init__(self)
JMF 0:24d3eb812fd4 58 self.SERVER_IP = None # Will be determined after knowing the target IP
JMF 0:24d3eb812fd4 59 self.SERVER_PORT = 0 # Let TCPServer choose an arbitrary port
JMF 0:24d3eb812fd4 60 self.server = None
JMF 0:24d3eb812fd4 61 self.server_thread = None
JMF 0:24d3eb812fd4 62 self.target_ip = None
JMF 0:24d3eb812fd4 63
JMF 0:24d3eb812fd4 64 @staticmethod
JMF 0:24d3eb812fd4 65 def find_interface_to_target_addr(target_ip):
JMF 0:24d3eb812fd4 66 """
JMF 0:24d3eb812fd4 67 Finds IP address of the interface through which it is connected to the target.
JMF 0:24d3eb812fd4 68
JMF 0:24d3eb812fd4 69 :return:
JMF 0:24d3eb812fd4 70 """
JMF 0:24d3eb812fd4 71 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
JMF 0:24d3eb812fd4 72 try:
JMF 0:24d3eb812fd4 73 s.connect((target_ip, 0)) # Target IP, any port
JMF 0:24d3eb812fd4 74 except socket.error:
JMF 0:24d3eb812fd4 75 s.connect((target_ip, 8000)) # Target IP, 'random' port
JMF 0:24d3eb812fd4 76 ip = s.getsockname()[0]
JMF 0:24d3eb812fd4 77 s.close()
JMF 0:24d3eb812fd4 78 return ip
JMF 0:24d3eb812fd4 79
JMF 0:24d3eb812fd4 80 def setup_udp_server(self):
JMF 0:24d3eb812fd4 81 """
JMF 0:24d3eb812fd4 82 sets up a UDP server for target to connect and send test data.
JMF 0:24d3eb812fd4 83
JMF 0:24d3eb812fd4 84 :return:
JMF 0:24d3eb812fd4 85 """
JMF 0:24d3eb812fd4 86 # !NOTE: There should mechanism to assert in the host test
JMF 0:24d3eb812fd4 87 if self.SERVER_IP is None:
JMF 0:24d3eb812fd4 88 self.log("setup_udp_server() called before determining server IP!")
JMF 0:24d3eb812fd4 89 self.notify_complete(False)
JMF 0:24d3eb812fd4 90
JMF 0:24d3eb812fd4 91 # Returning none will suppress host test from printing success code
JMF 0:24d3eb812fd4 92 self.server = UDPServer((self.SERVER_IP, self.SERVER_PORT), UDPEchoClientHandler)
JMF 0:24d3eb812fd4 93 ip, port = self.server.server_address
JMF 0:24d3eb812fd4 94 self.SERVER_PORT = port
JMF 0:24d3eb812fd4 95 self.server.allow_reuse_address = True
JMF 0:24d3eb812fd4 96 self.log("HOST: Listening for UDP packets: " + self.SERVER_IP + ":" + str(self.SERVER_PORT))
JMF 0:24d3eb812fd4 97 self.server_thread = Thread(target=UDPEchoClientTest.server_thread_func, args=(self,))
JMF 0:24d3eb812fd4 98 self.server_thread.start()
JMF 0:24d3eb812fd4 99
JMF 0:24d3eb812fd4 100 @staticmethod
JMF 0:24d3eb812fd4 101 def server_thread_func(this):
JMF 0:24d3eb812fd4 102 """
JMF 0:24d3eb812fd4 103 Thread function to run TCP server forever.
JMF 0:24d3eb812fd4 104
JMF 0:24d3eb812fd4 105 :param this:
JMF 0:24d3eb812fd4 106 :return:
JMF 0:24d3eb812fd4 107 """
JMF 0:24d3eb812fd4 108 this.server.serve_forever()
JMF 0:24d3eb812fd4 109
JMF 0:24d3eb812fd4 110 @event_callback("target_ip")
JMF 0:24d3eb812fd4 111 def _callback_target_ip(self, key, value, timestamp):
JMF 0:24d3eb812fd4 112 """
JMF 0:24d3eb812fd4 113 Callback to handle reception of target's IP address.
JMF 0:24d3eb812fd4 114
JMF 0:24d3eb812fd4 115 :param key:
JMF 0:24d3eb812fd4 116 :param value:
JMF 0:24d3eb812fd4 117 :param timestamp:
JMF 0:24d3eb812fd4 118 :return:
JMF 0:24d3eb812fd4 119 """
JMF 0:24d3eb812fd4 120 self.target_ip = value
JMF 0:24d3eb812fd4 121 self.SERVER_IP = self.find_interface_to_target_addr(self.target_ip)
JMF 0:24d3eb812fd4 122 self.setup_udp_server()
JMF 0:24d3eb812fd4 123
JMF 0:24d3eb812fd4 124 @event_callback("host_ip")
JMF 0:24d3eb812fd4 125 def _callback_host_ip(self, key, value, timestamp):
JMF 0:24d3eb812fd4 126 """
JMF 0:24d3eb812fd4 127 Callback for request for host IP Addr
JMF 0:24d3eb812fd4 128
JMF 0:24d3eb812fd4 129 """
JMF 0:24d3eb812fd4 130 self.send_kv("host_ip", self.SERVER_IP)
JMF 0:24d3eb812fd4 131
JMF 0:24d3eb812fd4 132 @event_callback("host_port")
JMF 0:24d3eb812fd4 133 def _callback_host_port(self, key, value, timestamp):
JMF 0:24d3eb812fd4 134 """
JMF 0:24d3eb812fd4 135 Callback for request for host port
JMF 0:24d3eb812fd4 136 """
JMF 0:24d3eb812fd4 137 self.send_kv("host_port", self.SERVER_PORT)
JMF 0:24d3eb812fd4 138
JMF 0:24d3eb812fd4 139 def teardown(self):
JMF 0:24d3eb812fd4 140 if self.server:
JMF 0:24d3eb812fd4 141 self.server.shutdown()
JMF 0:24d3eb812fd4 142 self.server_thread.join()