Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.9.3.2 #156

Merged
merged 9 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.2.41
3.9.3.2
8 changes: 5 additions & 3 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,16 @@ def execute_payloads(queues: namedtuple, traces: Any, args: object): # noqa: C9
job.t0 = os.times()
exit_code, diagnostics = payload_executor.run()
if exit_code and exit_code > 1000: # pilot error code, add to list
logger.debug(f'pilot error code received (code={exit_code}, diagnostics=\n{diagnostics})')
logger.warning(f'pilot error code received (code={exit_code}, diagnostics=\n{diagnostics})')
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=diagnostics)

logger.debug(f'run() returned exit_code={exit_code}')
set_cpu_consumption_time(job)
job.transexitcode = exit_code % 255
out.close()
err.close()
if out:
out.close()
if err:
err.close()

# some HPO jobs will produce new output files (following lfn name pattern), discover those and replace the job.outdata list
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
Expand Down
4 changes: 2 additions & 2 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,8 @@ def collect_zombies(self, depth: int = None):
for zombie in self.zombies:
try:
logger.info(f"zombie collector waiting for pid {zombie}")
_id, _ = os.waitpid(zombie, os.WNOHANG if current_depth else 0)
# dangerous, can lead to blocking : _id, _ = os.waitpid(zombie, os.WNOHANG if current_depth else 0)
_id, _ = os.waitpid(zombie, os.WNOHANG)
except OSError as exc:
logger.info(f"harmless exception when collecting zombies: {exc}")
zombies_to_remove.append(zombie)
Expand All @@ -1099,7 +1100,6 @@ def collect_zombies(self, depth: int = None):
# Remove collected zombies from the list
for zombie in zombies_to_remove:
self.zombies.remove(zombie)

if current_depth == 0:
break

Expand Down
2 changes: 1 addition & 1 deletion pilot/user/atlas/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def get_memory_monitor_info_path(workdir: str, allowtxtfile: bool = False) -> st
if os.path.exists(init_path):
path = init_path
else:
logger.info(f"neither {path}, nor {init_path} exist")
logger.debug(f"neither {path}, nor {init_path} exist")
path = ""

if path == "" and allowtxtfile:
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '41' # build number should be reset to '1' for every new development cycle
REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '2' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
7 changes: 5 additions & 2 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901

def read_output(stream, queue):
while True:
sleep(0.01)
try:
line = stream.readline()
if not line:
Expand All @@ -126,8 +127,10 @@ def read_output(stream, queue):
break
else:
raise

queue.put(line)
try:
queue.put_nowait(line)
except queue.Full:
sleep(0.01) # Sleep for a short interval to avoid busy waiting

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
Expand Down
2 changes: 1 addition & 1 deletion pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i
return exit_code, diagnostics

# update the OOM process info to prevent killing processes in the wrong order in case the job is killed (once)
update_oom_info(job.pid, job.transformation)
# update_oom_info(job.pid, job.transformation)

# should the pilot abort the payload?
exit_code, diagnostics = should_abort_payload(current_time, mt)
Expand Down
Loading