diff --git a/tests/gold_tests/autest-site/ports.py b/tests/gold_tests/autest-site/ports.py index bc4de3c1e32..0b8ec00e572 100644 --- a/tests/gold_tests/autest-site/ports.py +++ b/tests/gold_tests/autest-site/ports.py @@ -16,6 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio from typing import Set import socket import subprocess @@ -30,6 +31,72 @@ g_ports = None # ports we can use +class AsyncPortQueue(OrderedSetQueue): + + def __init__(self): + super().__init__() + self._listening_ports = _get_listening_ports() + + async def select_available(self, amount, dmin, dmax): + rmin = dmin - 2000 + rmax = 65536 - dmax + + port_tasks = [] + await asyncio.gather(*port_tasks) + if rmax > amount: + # Fill in ports, starting above the upper OS-usable port range. + port = dmax + 1 + while port < 65536 and self.qsize() < amount: + port_tasks.append(self._check_port(port)) + port += 1 + if rmin > amount and self.qsize() < amount: + port = 2001 + # Fill in more ports, starting at 2001, well above well known ports, + # and going up until the minimum port range used by the OS. + while port < dmin and self.qsize() < amount: + port_tasks.append(self._check_port(port)) + port += 1 + + await asyncio.gather(*port_tasks) + + async def _check_port(self, port): + if await self._is_port_open(port): + host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") + else: + host.WriteDebug('_setup_port_queue', f"Adding a possible port to connect to: {port}") + self.put(port) + + async def _is_port_open(self, port, address=None): + ret = False + if address is None: + address = "localhost" + + if port in self._listening_ports: + host.WriteDebug('PortOpen', f"{port} is open because it is in the listening sockets set.") + return True + + try: + # Try to connect on that port. If we can connect on it, then someone is + # listening on that port and therefore the port is open. + reader, writer = await asyncio.open_connection(address, port, limit=1) + writer.close() + await writer.wait_closed() + ret = True + host.WriteDebug( + 'PortOpen', f"Connection to port {port} succeeded, the port is open, " + "and a future connection cannot use it") + except ConnectionRefusedError: + host.WriteDebug( + 'PortOpen', f"socket error for port {port}, port is closed, " + "and therefore a future connection can use it") + except TimeoutError: + host.WriteDebug( + 'PortOpen', f"Timeout error for port {port}, port is closed, " + "and therefore a future connection can use it") + + return ret + + class PortQueueSelectionError(Exception): """ An exception for when there are problems selecting a port from the port @@ -147,13 +214,11 @@ def _setup_port_queue(amount=1000): Build up the set of ports that the OS in theory will not use. """ global g_ports - if g_ports is None: - host.WriteDebug('_setup_port_queue', "Populating the port queue.") - g_ports = OrderedSetQueue() - else: + if g_ports is not None: # The queue has already been populated. host.WriteDebug('_setup_port_queue', f"Queue was previously populated. Queue size: {g_ports.qsize()}") return + try: # Use sysctl to find the range of ports that the OS publishes it uses. # some docker setups don't have sbin setup correctly @@ -171,31 +236,17 @@ def _setup_port_queue(amount=1000): host.WriteWarning("Unable to call sysctrl!\n Tests may fail because of bad port selection!") return - rmin = dmin - 2000 - rmax = 65536 - dmax + host.WriteDebug('_setup_port_queue', "Populating the port queue.") + g_ports = AsyncPortQueue() - listening_ports = _get_listening_ports() - if rmax > amount: - # Fill in ports, starting above the upper OS-usable port range. - port = dmax + 1 - while port < 65536 and g_ports.qsize() < amount: - if PortOpen(port, listening_ports=listening_ports): - host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") - else: - host.WriteDebug('_setup_port_queue', f"Adding a possible port to connect to: {port}") - g_ports.put(port) - port += 1 - if rmin > amount and g_ports.qsize() < amount: - port = 2001 - # Fill in more ports, starting at 2001, well above well known ports, - # and going up until the minimum port range used by the OS. - while port < dmin and g_ports.qsize() < amount: - if PortOpen(port, listening_ports=listening_ports): - host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") - else: - host.WriteDebug('_setup_port_queue', f"Adding a possible port to connect to: {port}") - g_ports.put(port) - port += 1 + async def async_setup(): + await g_ports.select_available(amount, dmin, dmax) + + try: + loop = asyncio.get_running_loop() + loop.call_soon(async_setup()) + except RuntimeError: + asyncio.run(async_setup()) def _get_port_by_bind():