Rtos API example

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers tcpecho_server_auto.py Source File

tcpecho_server_auto.py

00001 """
00002 mbed SDK
00003 Copyright (c) 2011-2013 ARM Limited
00004 
00005 Licensed under the Apache License, Version 2.0 (the "License");
00006 you may not use this file except in compliance with the License.
00007 You may obtain a copy of the License at
00008 
00009     http://www.apache.org/licenses/LICENSE-2.0
00010 
00011 Unless required by applicable law or agreed to in writing, software
00012 distributed under the License is distributed on an "AS IS" BASIS,
00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 See the License for the specific language governing permissions and
00015 limitations under the License.
00016 """
00017 
00018 import re
00019 import sys
00020 import uuid
00021 import socket
00022 from sys import stdout
00023 
00024 class TCPEchoServerTest():
00025     ECHO_SERVER_ADDRESS = ""
00026     ECHO_PORT = 0
00027     ECHO_LOOPs = 100
00028     s = None # Socket
00029 
00030     PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
00031     re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
00032 
00033     def test(self, selftest):
00034         result = False
00035         c = selftest.mbed.serial_readline()
00036         if c is None:
00037             return selftest.RESULT_IO_SERIAL
00038         selftest.notify(c)
00039 
00040         m = self.re_detect_server_ip.search(c)
00041         if m and len(m.groups()):
00042             self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
00043             self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
00044             selftest.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
00045 
00046             # We assume this test fails so can't send 'error' message to server
00047             try:
00048                 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00049                 self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
00050             except Exception, e:
00051                 self.s = None
00052                 selftest.notify("HOST: Socket error: %s"% e)
00053                 return selftest.RESULT_ERROR
00054 
00055             print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs,
00056             for i in range(0, self.ECHO_LOOPs):
00057                 TEST_STRING = str(uuid.uuid4())
00058                 try:
00059                     self.s.sendall(TEST_STRING)
00060                     data = self.s.recv(128)
00061                 except Exception, e:
00062                     self.s = None
00063                     selftest.notify("HOST: Socket error: %s"% e)
00064                     return selftest.RESULT_ERROR
00065 
00066                 received_str = repr(data)[1:-1]
00067                 if TEST_STRING == received_str: # We need to cut not needed single quotes from the string
00068                     sys.stdout.write('.')
00069                     stdout.flush()
00070                     result = True
00071                 else:
00072                     print "Expected: "
00073                     print "'%s'"% TEST_STRING
00074                     print "received: "
00075                     print "'%s'"% received_str
00076                     result = False
00077                     break
00078 
00079             if self.s is not None:
00080                 self.s.close()
00081         else:
00082             selftest.notify("HOST: TCP Server not found")
00083             result = False
00084         return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE