Nicolas Borla / Mbed OS BBR_1Ebene
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers tcpecho_client_auto.py Source File

tcpecho_client_auto.py

00001 """
00002 mbed SDK
00003 Copyright (c) 2011-2013 ARM Limited
00004 
00005 Licensed under the Apache License, Version 2.0 (the "License");
00006 you may not use this file except in compliance with the License.
00007 You may obtain a copy of the License at
00008 
00009     http://www.apache.org/licenses/LICENSE-2.0
00010 
00011 Unless required by applicable law or agreed to in writing, software
00012 distributed under the License is distributed on an "AS IS" BASIS,
00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 See the License for the specific language governing permissions and
00015 limitations under the License.
00016 """
00017 from __future__ import print_function
00018 
00019 import sys
00020 import socket
00021 from sys import stdout
00022 try:
00023     from SocketServer import BaseRequestHandler, TCPServer
00024 except ImportError:
00025     from socketserver import BaseRequestHandler, TCPServer
00026 
00027 class TCPEchoClient_Handler(BaseRequestHandler):
00028     def handle(self):
00029         """ One handle per connection
00030         """
00031         print("HOST: Connection received...")
00032         count = 1;
00033         while True:
00034             data = self.request.recv(1024)
00035             if not data: break
00036             self.request.sendall(data)
00037             if '{{end}}' in str(data):
00038                 print
00039                 print(str(data))
00040             else:
00041                 if not count % 10:
00042                     sys.stdout.write('.')
00043                 count += 1
00044             stdout.flush()
00045 
00046 class TCPEchoClientTest():
00047     def send_server_ip_port(self, selftest, ip_address, port_no):
00048         """ Set up network host. Reset target and and send server IP via serial to Mbed
00049         """
00050         c = selftest.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
00051         if c is None:
00052             self.print_result(selftest.RESULT_IO_SERIAL)
00053             return
00054 
00055         selftest.notify(c.strip())
00056         selftest.notify("HOST: Sending server IP Address to target...")
00057 
00058         connection_str = ip_address + ":" + str(port_no) + "\n"
00059         selftest.mbed.serial_write(connection_str)
00060         selftest.notify(connection_str)
00061 
00062         # Two more strings about connection should be sent by MBED
00063         for i in range(0, 2):
00064             c = selftest.mbed.serial_readline()
00065             if c is None:
00066                 selftest.print_result(self.RESULT_IO_SERIAL)
00067                 return
00068             selftest.notify(c.strip())
00069 
00070     def test(self, selftest):
00071         # We need to discover SERVEP_IP and set up SERVER_PORT
00072         # Note: Port 7 is Echo Protocol:
00073         #
00074         # Port number rationale:
00075         #
00076         # The Echo Protocol is a service in the Internet Protocol Suite defined
00077         # in RFC 862. It was originally proposed for testing and measurement
00078         # of round-trip times[citation needed] in IP networks.
00079         #
00080         # A host may connect to a server that supports the Echo Protocol using
00081         # the Transmission Control Protocol (TCP) or the User Datagram Protocol
00082         # (UDP) on the well-known port number 7. The server sends back an
00083         # identical copy of the data it received.
00084         SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
00085         SERVER_PORT = 7
00086 
00087         # Returning none will suppress host test from printing success code
00088         server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
00089         print("HOST: Listening for TCP connections: %s:%s" %
00090               (SERVER_IP, str(SERVER_PORT)))
00091         self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
00092         server.serve_forever()