Knight KE / Mbed OS Game_Master
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers udpecho_server_auto.py Source File

udpecho_server_auto.py

00001 """
00002 mbed SDK
00003 Copyright (c) 2011-2013 ARM Limited
00004 
00005 Licensed under the Apache License, Version 2.0 (the "License");
00006 you may not use this file except in compliance with the License.
00007 You may obtain a copy of the License at
00008 
00009     http://www.apache.org/licenses/LICENSE-2.0
00010 
00011 Unless required by applicable law or agreed to in writing, software
00012 distributed under the License is distributed on an "AS IS" BASIS,
00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 See the License for the specific language governing permissions and
00015 limitations under the License.
00016 """
00017 from __future__ import print_function
00018 
00019 import re
00020 import sys
00021 import uuid
00022 from sys import stdout
00023 from socket import socket, AF_INET, SOCK_DGRAM
00024 
00025 class UDPEchoServerTest():
00026     ECHO_SERVER_ADDRESS = ""
00027     ECHO_PORT = 0
00028     s = None # Socket
00029 
00030     PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
00031     re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
00032 
00033     def test(self, selftest):
00034         result = True
00035         serial_ip_msg = selftest.mbed.serial_readline()
00036         if serial_ip_msg is None:
00037             return selftest.RESULT_IO_SERIAL
00038         selftest.notify(serial_ip_msg)
00039         # Searching for IP address and port prompted by server
00040         m = self.re_detect_server_ip.search(serial_ip_msg)
00041         if m and len(m.groups()):
00042             self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
00043             self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
00044             selftest.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
00045 
00046             # We assume this test fails so can't send 'error' message to server
00047             try:
00048                 self.s = socket(AF_INET, SOCK_DGRAM)
00049             except Exception as e:
00050                 self.s = None
00051                 selftest.notify("HOST: Socket error: %s"% e)
00052                 return selftest.RESULT_ERROR
00053 
00054             for i in range(0, 100):
00055                 TEST_STRING = str(uuid.uuid4())
00056                 self.s.sendto(TEST_STRING, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
00057                 data = self.s.recv(len(TEST_STRING))
00058                 received_str = repr(data)[1:-1]
00059                 if TEST_STRING != received_str:
00060                     result = False
00061                     break
00062                 sys.stdout.write('.')
00063                 stdout.flush()
00064         else:
00065             result = False
00066 
00067         if self.s is not None:
00068             self.s.close()
00069         return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE