diff --git a/nvflare/fuel/utils/pipe/pipe_handler.py b/nvflare/fuel/utils/pipe/pipe_handler.py index efbbcee134..49e199f853 100644 --- a/nvflare/fuel/utils/pipe/pipe_handler.py +++ b/nvflare/fuel/utils/pipe/pipe_handler.py @@ -296,11 +296,19 @@ def _read(self): def _try_read(self): self._last_heartbeat_received_time = time.time() while not self.asked_to_stop: + time.sleep(self.read_interval) if self._pause: - time.sleep(self.read_interval) continue - msg = self.pipe.receive() + # we assign self.pipe to p and access pipe methods through p + # this is because self.pipe could be set to None at any moment (e.g. the abort process could + # stop the pipe handler at any time). + p = self.pipe + if not p: + # the pipe handler is most likely stopped, but we leave it for the while loop to decide + continue + + msg = p.receive() now = time.time() if msg: @@ -315,7 +323,7 @@ def _try_read(self): else: # is peer gone? # ask the pipe for the last known active time of the peer - last_peer_active_time = self.pipe.get_last_peer_active_time() + last_peer_active_time = p.get_last_peer_active_time() if last_peer_active_time > self._last_heartbeat_received_time: self._last_heartbeat_received_time = last_peer_active_time @@ -331,7 +339,6 @@ def _try_read(self): ) break - time.sleep(self.read_interval) self.reader = None def _heartbeat(self):