Rtos API example

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers tcpecho_client_auto.py Source File

tcpecho_client_auto.py

00001 """
00002 mbed SDK
00003 Copyright (c) 2011-2013 ARM Limited
00004 
00005 Licensed under the Apache License, Version 2.0 (the "License");
00006 you may not use this file except in compliance with the License.
00007 You may obtain a copy of the License at
00008 
00009     http://www.apache.org/licenses/LICENSE-2.0
00010 
00011 Unless required by applicable law or agreed to in writing, software
00012 distributed under the License is distributed on an "AS IS" BASIS,
00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 See the License for the specific language governing permissions and
00015 limitations under the License.
00016 """
00017 
00018 import sys
00019 import socket
00020 from sys import stdout
00021 from SocketServer import BaseRequestHandler, TCPServer
00022 
00023 class TCPEchoClient_Handler(BaseRequestHandler):
00024     def handle(self):
00025         """ One handle per connection
00026         """
00027         print "HOST: Connection received...",
00028         count = 1;
00029         while True:
00030             data = self.request.recv(1024)
00031             if not data: break
00032             self.request.sendall(data)
00033             if '{{end}}' in str(data):
00034                 print
00035                 print str(data)
00036             else:
00037                 if not count % 10:
00038                     sys.stdout.write('.')
00039                 count += 1
00040             stdout.flush()
00041 
00042 class TCPEchoClientTest():
00043     def send_server_ip_port(self, selftest, ip_address, port_no):
00044         """ Set up network host. Reset target and and send server IP via serial to Mbed
00045         """
00046         c = selftest.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
00047         if c is None:
00048             self.print_result(selftest.RESULT_IO_SERIAL)
00049             return
00050 
00051         selftest.notify(c.strip())
00052         selftest.notify("HOST: Sending server IP Address to target...")
00053 
00054         connection_str = ip_address + ":" + str(port_no) + "\n"
00055         selftest.mbed.serial_write(connection_str)
00056         selftest.notify(connection_str)
00057 
00058         # Two more strings about connection should be sent by MBED
00059         for i in range(0, 2):
00060             c = selftest.mbed.serial_readline()
00061             if c is None:
00062                 selftest.print_result(self.RESULT_IO_SERIAL)
00063                 return
00064             selftest.notify(c.strip())
00065 
00066     def test(self, selftest):
00067         # We need to discover SERVEP_IP and set up SERVER_PORT
00068         # Note: Port 7 is Echo Protocol:
00069         #
00070         # Port number rationale:
00071         #
00072         # The Echo Protocol is a service in the Internet Protocol Suite defined
00073         # in RFC 862. It was originally proposed for testing and measurement
00074         # of round-trip times[citation needed] in IP networks.
00075         #
00076         # A host may connect to a server that supports the Echo Protocol using
00077         # the Transmission Control Protocol (TCP) or the User Datagram Protocol
00078         # (UDP) on the well-known port number 7. The server sends back an
00079         # identical copy of the data it received.
00080         SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
00081         SERVER_PORT = 7
00082 
00083         # Returning none will suppress host test from printing success code
00084         server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
00085         print "HOST: Listening for TCP connections: " + SERVER_IP + ":" + str(SERVER_PORT)
00086         self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
00087         server.serve_forever()