Skip to content

Commit

Permalink
fix race condition handling (NVIDIA#2728)
Browse files Browse the repository at this point in the history
  • Loading branch information
yanchengnv authored Jul 25, 2024
1 parent a276dc0 commit ce02675
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions nvflare/fuel/utils/pipe/pipe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,19 @@ def _read(self):
def _try_read(self):
self._last_heartbeat_received_time = time.time()
while not self.asked_to_stop:
time.sleep(self.read_interval)
if self._pause:
time.sleep(self.read_interval)
continue

msg = self.pipe.receive()
# we assign self.pipe to p and access pipe methods through p
# this is because self.pipe could be set to None at any moment (e.g. the abort process could
# stop the pipe handler at any time).
p = self.pipe
if not p:
# the pipe handler is most likely stopped, but we leave it for the while loop to decide
continue

msg = p.receive()
now = time.time()

if msg:
Expand All @@ -315,7 +323,7 @@ def _try_read(self):
else:
# is peer gone?
# ask the pipe for the last known active time of the peer
last_peer_active_time = self.pipe.get_last_peer_active_time()
last_peer_active_time = p.get_last_peer_active_time()
if last_peer_active_time > self._last_heartbeat_received_time:
self._last_heartbeat_received_time = last_peer_active_time

Expand All @@ -331,7 +339,6 @@ def _try_read(self):
)
break

time.sleep(self.read_interval)
self.reader = None

def _heartbeat(self):
Expand Down

0 comments on commit ce02675

Please sign in to comment.