Skip to content

Commit d5f7f83

Browse files
authoredMar 24, 2025··
Merge pull request #987 from jiridanek/jd_fix_proxy__
ISSUE-922: chore(tests/containers): implement retry if port-forwarding fails
2 parents 1bd07a6 + 1bec78c commit d5f7f83

File tree

2 files changed

+47
-19
lines changed

2 files changed

+47
-19
lines changed
 

‎tests/containers/kubernetes_utils.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,16 @@ def deploy(self, container_name: str) -> None:
225225
assert len(pod_name.items) == 1
226226
pod: kubernetes.client.models.v1_pod.V1Pod = pod_name.items[0]
227227

228-
p = socket_proxy.SocketProxy(exposing_contextmanager(core_v1_api, pod), "localhost", 0)
228+
p = socket_proxy.SocketProxy(lambda: exposing_contextmanager(core_v1_api, pod), "localhost", 0)
229229
t = threading.Thread(target=p.listen_and_serve_until_canceled)
230230
t.start()
231231
self.tf.defer(t, lambda thread: thread.join())
232232
self.tf.defer(p.cancellation_token, lambda token: token.cancel())
233233

234234
self.port = p.get_actual_port()
235235
LOGGER.debug(f"Listening on port {self.port}")
236-
resp = requests.get(f"http://localhost:{self.port}")
237-
assert resp.status_code == 200
236+
Wait.until("Connecting to pod succeeds", 1, 30,
237+
lambda: requests.get(f"http://localhost:{self.port}").status_code == 200)
238238
LOGGER.debug(f"Done with portforward")
239239

240240

@@ -344,6 +344,8 @@ def until(
344344
result: bool = ready()
345345
except KeyboardInterrupt:
346346
raise # quick exit if the user gets tired of waiting
347+
except SyntaxError: # this actually won't happen, but it's good to keep in mind
348+
raise # quick exit in cases developer obviously screwed up
347349
except Exception as e:
348350
exception_message = str(e)
349351

‎tests/containers/socket_proxy.py

+42-16
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
import logging
55
import socket
66
import select
7+
import struct
78
import threading
89
import subprocess
9-
import typing
10+
from typing import Callable, ContextManager
1011

1112
from tests.containers.cancellation_token import CancellationToken
1213

@@ -50,7 +51,7 @@ def stop(self):
5051
class SocketProxy:
5152
def __init__(
5253
self,
53-
remote_socket_factory: typing.ContextManager[socket.socket],
54+
remote_socket_factory: Callable[..., ContextManager[socket.socket]],
5455
local_host: str = "localhost",
5556
local_port: int = 0,
5657
buffer_size: int = 4096
@@ -81,9 +82,14 @@ def listen_and_serve_until_canceled(self):
8182
Handles at most one client at a time. """
8283
try:
8384
while not self.cancellation_token.cancelled:
84-
client_socket, addr = self.server_socket.accept()
85-
logging.info(f"Accepted connection from {addr[0]}:{addr[1]}")
86-
self._handle_client(client_socket)
85+
readable, _, _ = select.select([self.server_socket, self.cancellation_token], [], [])
86+
87+
# ISSUE-922: socket.accept() blocks, so if cancel() did not come very fast, we'd loop over and block
88+
if self.server_socket in readable:
89+
client_socket, addr = self.server_socket.accept()
90+
logging.info(f"Accepted connection from {addr[0]}:{addr[1]}")
91+
# handle client synchronously, which means that there can be at most one at a time
92+
self._handle_client(client_socket)
8793
except Exception as e:
8894
logging.exception(f"Proxying failed to listen", exc_info=e)
8995
raise
@@ -96,27 +102,39 @@ def get_actual_port(self) -> int:
96102
return self.server_socket.getsockname()[1]
97103

98104
def _handle_client(self, client_socket):
99-
with client_socket as _, self.remote_socket_factory as remote_socket:
100-
while True:
105+
with client_socket as _, self.remote_socket_factory() as remote_socket:
106+
while not self.cancellation_token.cancelled:
101107
readable, _, _ = select.select([client_socket, remote_socket, self.cancellation_token], [], [])
102108

103-
if self.cancellation_token.cancelled:
104-
break
105-
106109
if client_socket in readable:
107110
data = client_socket.recv(self.buffer_size)
108111
if not data:
109112
break
110113
remote_socket.send(data)
111114

112115
if remote_socket in readable:
113-
data = remote_socket.recv(self.buffer_size)
116+
try:
117+
data = remote_socket.recv(self.buffer_size)
118+
except ConnectionResetError:
119+
# ISSUE-922: it seems best to propagate the error and let the client retry
120+
# alternatively it would be necessary to resend anything already received from client_socket
121+
logging.info(f"Reading from remote socket failed, client {client_socket.getpeername()} has been disconnected")
122+
_rst_socket(client_socket)
123+
break
114124
if not data:
115125
break
116126
client_socket.send(data)
117127

118128

119-
if __name__ == "__main__":
129+
def _rst_socket(s: socket):
130+
"""Closing a SO_LINGER socket will RST it
131+
https://stackoverflow.com/questions/46264404/how-can-i-reset-a-tcp-socket-in-python
132+
"""
133+
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
134+
s.close()
135+
136+
137+
def main() -> None:
120138
"""Sample application to show how this can work."""
121139

122140

@@ -161,13 +179,21 @@ def get_actual_port(self):
161179
server.join()
162180

163181

164-
proxy = SocketProxy(remote_socket_factory(), "localhost", 0)
182+
proxy = SocketProxy(remote_socket_factory, "localhost", 0)
165183
thread = threading.Thread(target=proxy.listen_and_serve_until_canceled)
166184
thread.start()
167185

168-
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
169-
client_socket.connect(("localhost", proxy.get_actual_port()))
186+
for _ in range(2):
187+
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
188+
client_socket.connect(("localhost", proxy.get_actual_port()))
170189

171-
print(client_socket.recv(1024)) # prints Hello World
190+
print(client_socket.recv(1024)) # prints Hello World
191+
print(client_socket.recv(1024)) # prints nothing
192+
client_socket.close()
193+
proxy.cancellation_token.cancel()
172194

173195
thread.join()
196+
197+
198+
if __name__ == "__main__":
199+
main()

0 commit comments

Comments
 (0)
Please sign in to comment.