Skip to content

Commit 012cfba

Browse files
authored
Fix UDPRadio atexit_timeout and use it in tests (#3899)
This parameter wasn't wired through, but because there was a default deeper in the code, nothing detected that. See #2973 for some context around this deep-default coding style: this PR removes the duplicated defaults and leaves the default only on the user facing UDPRadio class. This PR uses the now-working parameter to make UDPRadioReceiver shutdown faster in tests: because in these test cases, we know that we have received all messages that we will process, there is no need to wait for any further outstanding messages. This saves around 9 seconds of test runtime - both theoretically (3 tests, each 3 second saving) and in practice on my laptop. Now that this is wired through properly, a type mismatch between the original atexit_timeout (int) and the unwired UDPRadio parameter (float | int) is revealed and fixed: everything uses the original int type now. ## Type of change - Bug fix
1 parent 9c2ff0e commit 012cfba

File tree

4 files changed

+15
-10
lines changed

4 files changed

+15
-10
lines changed

parsl/monitoring/radios/udp.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import pickle
33
import socket
44
from multiprocessing.queues import Queue
5-
from typing import Optional, Union
5+
from typing import Optional
66

77
from parsl.monitoring.radios.base import (
88
MonitoringRadioReceiver,
@@ -15,7 +15,7 @@
1515

1616

1717
class UDPRadio(RadioConfig):
18-
def __init__(self, *, port: Optional[int] = None, atexit_timeout: Union[int, float] = 3, address: str, debug: bool = False):
18+
def __init__(self, *, port: Optional[int] = None, atexit_timeout: int = 3, address: str, debug: bool = False):
1919
self.port = port
2020
self.atexit_timeout = atexit_timeout
2121
self.address = address
@@ -29,7 +29,8 @@ def create_receiver(self, run_dir: str, resource_msgs: Queue) -> MonitoringRadio
2929
udp_receiver = start_udp_receiver(logdir=run_dir,
3030
monitoring_messages=resource_msgs,
3131
port=self.port,
32-
debug=self.debug
32+
debug=self.debug,
33+
atexit_timeout=self.atexit_timeout
3334
)
3435
self.port = udp_receiver.port
3536
return udp_receiver

parsl/monitoring/radios/udp_router.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(self,
3838
udp_port: Optional[int] = None,
3939
run_dir: str = ".",
4040
logging_level: int = logging.INFO,
41-
atexit_timeout: int = 3, # in seconds
41+
atexit_timeout: int, # in seconds
4242
resource_msgs: mpq.Queue,
4343
exit_event: Event,
4444
):
@@ -129,14 +129,16 @@ def udp_router_starter(*,
129129
udp_port: Optional[int],
130130

131131
run_dir: str,
132-
logging_level: int) -> None:
132+
logging_level: int,
133+
atexit_timeout: int) -> None:
133134
setproctitle("parsl: monitoring UDP router")
134135
try:
135136
router = MonitoringRouter(udp_port=udp_port,
136137
run_dir=run_dir,
137138
logging_level=logging_level,
138139
resource_msgs=resource_msgs,
139-
exit_event=exit_event)
140+
exit_event=exit_event,
141+
atexit_timeout=atexit_timeout)
140142
except Exception as e:
141143
logger.error("MonitoringRouter construction failed.", exc_info=True)
142144
comm_q.put(f"Monitoring router construction failed: {e}")
@@ -165,7 +167,8 @@ def start_udp_receiver(*,
165167
monitoring_messages: Queue,
166168
port: Optional[int],
167169
logdir: str,
168-
debug: bool) -> UDPRadioReceiver:
170+
debug: bool,
171+
atexit_timeout: int) -> UDPRadioReceiver:
169172

170173
udp_comm_q: Queue[Union[int, str]]
171174
udp_comm_q = SizedQueue(maxsize=10)
@@ -179,6 +182,7 @@ def start_udp_receiver(*,
179182
"udp_port": port,
180183
"run_dir": logdir,
181184
"logging_level": logging.DEBUG if debug else logging.INFO,
185+
"atexit_timeout": atexit_timeout,
182186
},
183187
name="Monitoring-UDP-Router-Process",
184188
daemon=True,

parsl/tests/test_monitoring/test_basic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def this_app():
2828
# a configuration that is suitably configured for monitoring.
2929

3030
def thread_config():
31-
c = Config(executors=[ThreadPoolExecutor(remote_monitoring_radio=UDPRadio(address="localhost"))],
31+
c = Config(executors=[ThreadPoolExecutor(remote_monitoring_radio=UDPRadio(address="localhost", atexit_timeout=0))],
3232
monitoring=MonitoringHub(resource_monitoring_interval=0))
3333
return c
3434

@@ -47,7 +47,7 @@ def htex_udp_config():
4747
ex = c.executors[0]
4848

4949
assert isinstance(ex.remote_monitoring_radio, HTEXRadio), "precondition: htex is configured for the HTEXRadio"
50-
ex.remote_monitoring_radio = UDPRadio(address="localhost")
50+
ex.remote_monitoring_radio = UDPRadio(address="localhost", atexit_timeout=0)
5151

5252
return c
5353

parsl/tests/test_monitoring/test_radio_udp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def test_udp(tmpd_cwd):
1515

1616
resource_msgs = SpawnQueue()
1717

18-
radio_config = UDPRadio(address="localhost")
18+
radio_config = UDPRadio(address="localhost", atexit_timeout=0)
1919

2020
# start receiver
2121
udp_receiver = radio_config.create_receiver(run_dir=str(tmpd_cwd),

0 commit comments

Comments
 (0)