Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
TCPSERVER_ACCEPT.py
00001 """ 00002 Copyright 2018 ARM Limited 00003 Licensed under the Apache License, Version 2.0 (the "License"); 00004 you may not use this file except in compliance with the License. 00005 You may obtain a copy of the License at 00006 00007 http://www.apache.org/licenses/LICENSE-2.0 00008 00009 Unless required by applicable law or agreed to in writing, software 00010 distributed under the License is distributed on an "AS IS" BASIS, 00011 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00012 See the License for the specific language governing permissions and 00013 limitations under the License. 00014 """ 00015 00016 import threading 00017 import time 00018 00019 from icetea_lib.TestStepError import TestStepFail 00020 from icetea_lib.bench import Bench 00021 from interface import interfaceUp, interfaceDown 00022 00023 00024 class Testcase(Bench): 00025 def __init__(self): 00026 Bench.__init__(self, 00027 name="TCPSERVER_ACCEPT", 00028 title="TCPSERVER_ACCEPT", 00029 purpose="Test that TCPServer::bind(), TCPServer::listen() and TCPServer::accept() works", 00030 status="released", 00031 component=["mbed-os", "netsocket"], 00032 author="Juha Ylinen <juha.ylinen@arm.com>", 00033 type="smoke", 00034 subtype="socket", 00035 requirements={ 00036 "duts": { 00037 '*': { # requirements for all nodes 00038 "count": 2, 00039 "type": "hardware", 00040 "application": {"name": "TEST_APPS-device-socket_app"} 00041 }, 00042 "1": {"nick": "dut1"}, 00043 "2": {"nick": "dut2"} 00044 } 00045 } 00046 ) 00047 00048 def setup(self): 00049 interface = interfaceUp(self, ["dut1"]) 00050 self.server_ip = interface["dut1"]["ip"] 00051 interface = interfaceUp(self, ["dut2"]) 00052 self.client_ip = interface["dut2"]["ip"] 00053 00054 def clientThread(self): 00055 self.logger.info("Starting") 00056 time.sleep(5) # wait accept from server 00057 self.command("dut2", "socket " + str(self.client_socket_id) + " open") 00058 self.command("dut2", "socket " + str(self.client_socket_id) + " connect " + str(self.server_ip) + " " + str( 00059 self.used_port)) 00060 00061 def case(self): 00062 self.used_port = 2000 00063 00064 response = self.command("dut1", "socket new TCPServer") 00065 server_base_socket_id = int(response.parsed['socket_id']) 00066 00067 self.command("dut1", "socket " + str(server_base_socket_id) + " open") 00068 self.command("dut1", "socket " + str(server_base_socket_id) + " bind port " + str(self.used_port)) 00069 self.command("dut1", "socket " + str(server_base_socket_id) + " listen") 00070 00071 response = self.command("dut1", "socket new TCPSocket") 00072 server_socket_id = int(response.parsed['socket_id']) 00073 self.command("dut1", "socket " + str(server_socket_id) + " open") 00074 00075 response = self.command("dut2", "socket new TCPSocket") 00076 zero = response.timedelta 00077 self.client_socket_id = int(response.parsed['socket_id']) 00078 00079 # Create a thread which calls client connect() 00080 t = threading.Thread(name='clientThread', target=self.clientThread) 00081 t.start() 00082 00083 wait = 5 00084 response = self.command("dut1", "socket " + str(server_base_socket_id) + " accept " + str(server_socket_id)) 00085 response.verify_response_duration(expected=wait, zero=zero, threshold_percent=10, break_in_fail=True) 00086 socket_id = int(response.parsed['socket_id']) 00087 00088 t.join() 00089 self.command("dut1", "socket " + str(socket_id) + " send hello") 00090 00091 response = self.command("dut2", "socket " + str(self.client_socket_id) + " recv 5") 00092 data = response.parsed['data'].replace(":", "") 00093 00094 if data != "hello": 00095 raise TestStepFail("Received data doesn't match the sent data") 00096 00097 self.command("dut1", "socket " + str(server_socket_id) + " delete") 00098 self.command("dut1", "socket " + str(server_base_socket_id) + " delete") 00099 self.command("dut2", "socket " + str(self.client_socket_id) + " delete") 00100 00101 def teardown(self): 00102 interfaceDown(self, ["dut1"]) 00103 interfaceDown(self, ["dut2"])
Generated on Tue Aug 9 2022 00:37:21 by
1.7.2