Skip to content

Commit

Permalink
Separate send heartbeat out (#2356)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanTingHsieh authored Feb 8, 2024
1 parent 912adb9 commit 1bc7196
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
external_execution_wait: float = 5.0,
peer_read_timeout: Optional[float] = None,
monitor_interval: float = 0.01,
read_interval: float = 0.001,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
workers: int = 4,
Expand Down
2 changes: 1 addition & 1 deletion nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(
external_execution_wait: float = 5.0,
peer_read_timeout: Optional[float] = None,
monitor_interval: float = 1.0,
read_interval: float = 0.1,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
workers: int = 1,
Expand Down
6 changes: 3 additions & 3 deletions nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TaskExchanger(Executor):
def __init__(
self,
pipe_id: str,
read_interval: float = 0.1,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: Optional[float] = 30.0,
resend_interval: float = 2.0,
Expand All @@ -48,7 +48,7 @@ def __init__(
Args:
pipe_id (str): component id of pipe.
read_interval (float): how often to read from pipe.
Defaults to 0.1.
Defaults to 0.5.
heartbeat_interval (float): how often to send heartbeat to peer.
Defaults to 5.0.
heartbeat_timeout (float, optional): how long to wait for a
Expand Down Expand Up @@ -115,7 +115,7 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
self.pipe_handler.set_status_cb(self._pipe_status_cb)
self.pipe.open(self.pipe_channel_name)
self.pipe_handler.start()
elif event_type == EventType.END_RUN:
elif event_type == EventType.ABOUT_TO_END_RUN:
self.log_info(fl_ctx, "Stopping pipe handler")
if self.pipe_handler:
self.pipe_handler.notify_end("end_of_job")
Expand Down
10 changes: 10 additions & 0 deletions nvflare/fuel/utils/pipe/cell_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ def set_cell_cb(self, channel_name: str):
self.logger.info(f"registered CellPipe request CB for {self.channel}")

def send(self, msg: Message, timeout=None) -> bool:
"""Sends the specified message to the peer.
Args:
msg: the message to be sent
timeout: if specified, number of secs to wait for the peer to read the message.
If not specified, wait indefinitely.
Returns:
Whether the message is read by the peer.
"""
with self.pipe_lock:
if self.closed:
raise BrokenPipeError("pipe closed")
Expand Down
16 changes: 8 additions & 8 deletions nvflare/fuel/utils/pipe/file_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def clear(self):
self._clear_dir(self.y_path)
self._clear_dir(self.t_path)

def _monitor_file(self, file_path: str, timeout) -> bool:
def _monitor_file(self, file_path: str, timeout=None) -> bool:
"""Monitors the file until it's read-and-removed by peer, or timed out.
If timeout, remove the file.
Expand All @@ -147,16 +147,14 @@ def _monitor_file(self, file_path: str, timeout) -> bool:
Returns:
whether the file has been read and removed
"""
if not timeout:
return False
start = time.time()
while True:
if not self.pipe_path:
raise BrokenPipeError("pipe broken")

if not os.path.exists(file_path):
return True
if time.time() - start > timeout:
if timeout and time.time() - start > timeout:
# timed out - try to delete the file
try:
os.remove(file_path)
Expand Down Expand Up @@ -247,13 +245,15 @@ def y_get(self, timeout=None):
return self._get_from_dir(self.y_path, timeout)

def send(self, msg: Message, timeout=None) -> bool:
"""
"""Sends the specified message to the peer.
Args:
msg:
timeout:
msg: the message to be sent
timeout: if specified, number of secs to wait for the peer to read the message.
If not specified, wait indefinitely.
Returns: whether the message is read by peer (if timeout is specified)
Returns:
Whether the message is read by the peer.
"""
if not self.pipe_path:
Expand Down
11 changes: 7 additions & 4 deletions nvflare/fuel/utils/pipe/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,15 @@ def clear(self):

@abstractmethod
def send(self, msg: Message, timeout=None) -> bool:
"""Send the specified message to the peer.
"""Sends the specified message to the peer.
Args:
msg: the message to be sent
timeout: if specified, number of secs to wait for the peer to read the message.
If not specified, wait indefinitely.
Returns: whether the message is read by the peer.
If timeout is not specified, always return False.
Returns:
Whether the message is read by the peer.
"""
pass
Expand All @@ -117,8 +118,10 @@ def receive(self, timeout=None) -> Union[None, Message]:
Args:
timeout: how long (number of seconds) to try
If not specified, return right away.
Returns: the message received; or None if no message
Returns:
the message received; or None if no message
"""
pass
Expand Down
26 changes: 20 additions & 6 deletions nvflare/fuel/utils/pipe/pipe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def __init__(
self.peer_is_up_or_dead = threading.Event()
self._pause = False
self._last_heartbeat_received_time = None
self._check_interval = 0.01
self.heartbeat_sender = threading.Thread(target=self._heartbeat)
self.heartbeat_sender.daemon = True

def set_status_cb(self, cb, *args, **kwargs):
"""Set CB for status handling. When the peer status is changed (ABORT, END, GONE), this CB is called.
Expand Down Expand Up @@ -208,6 +211,9 @@ def start(self):
if not self.reader.is_alive():
self.reader.start()

if not self.heartbeat_sender.is_alive():
self.heartbeat_sender.start()

def stop(self, close_pipe=True):
"""Stops the handler and optionally close the monitored pipe.
Expand All @@ -231,7 +237,7 @@ def send_to_peer(self, msg: Message, timeout=None, abort_signal: Signal = None)
Args:
msg: message to be sent
timeout: how long to wait for the peer to read the data.
If not specified, return False immediately.
If not specified, will use ``self.default_request_timeout``.
abort_signal:
Returns:
Expand Down Expand Up @@ -285,15 +291,13 @@ def _read(self):

def _try_read(self):
self._last_heartbeat_received_time = time.time()
last_heartbeat_sent_time = 0.0
while not self.asked_to_stop:
now = time.time()

if self._pause:
time.sleep(self.read_interval)
continue

msg = self.pipe.receive()
now = time.time()

if msg:
self._last_heartbeat_received_time = now
Expand All @@ -318,13 +322,23 @@ def _try_read(self):
)
break

time.sleep(self.read_interval)
self.reader = None

def _heartbeat(self):
last_heartbeat_sent_time = 0.0
while not self.asked_to_stop:
if self._pause:
time.sleep(self._check_interval)
continue
now = time.time()

# send heartbeat to the peer
if now - last_heartbeat_sent_time > self.heartbeat_interval:
self.send_to_peer(self._make_event_message(Topic.HEARTBEAT, ""))
last_heartbeat_sent_time = now

time.sleep(self.read_interval)
self.reader = None
time.sleep(self._check_interval)

def get_next(self) -> Optional[Message]:
"""Gets the next message from the message queue.
Expand Down

0 comments on commit 1bc7196

Please sign in to comment.